import psycopg2.extras def get_timestamp_previous(this_db): """ Retrieve second to last timestamp from the stocks database :param this_db: :return timestamp: """ cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) query = """SELECT DISTINCT timestamp FROM stocks ORDER BY stocks.timestamp DESC LIMIT 1 OFFSET 1;""" cursor.execute(query) res = cursor.fetchone() this_timestamp_previous = res.timestamp cursor.close() return this_timestamp_previous def get_timestamp_latest(this_db): """ Retrieve latest timestamp from the stocks database :param this_db: :return timestamp: """ cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) query = """SELECT DISTINCT timestamp FROM stocks ORDER BY stocks.timestamp DESC LIMIT 1;""" cursor.execute(query) res = cursor.fetchone() this_timestamp_latest = res.timestamp cursor.close() return this_timestamp_latest def get_timestamp_stored(): """ Retrieve the timestamp when the program checked the database :return timestamp: """ with open('timestamp.txt', 'r') as f: this_timestamp_stored_string = f.read() if this_timestamp_stored_string.replace('.', '', 1).isdigit(): this_timestamp_stored = int(this_timestamp_stored_string) else: this_timestamp_stored = 0 return this_timestamp_stored def put_timestamp_stored(this_timestamp): """ Update timestamp when the program checked the database :param this_timestamp: :return: """ with open('timestamp.txt', 'w') as f: f.write("{}".format(this_timestamp)) def get_data(stock_id, this_timestamp, this_db): """ Retrieves detailed data from the existing stocks database :param stock_id: :param this_timestamp: :param this_db: :return: """ cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) query = """SELECT current_price, available_shares, forecast, total_shares FROM stocks WHERE stock_id = %s AND timestamp = %s""" cursor.execute(query, (stock_id, this_timestamp)) this_data = cursor.fetchone() cursor.close() return this_data def process_data(data_previous, data_latest, price, threshold): """ Calculate if there is enough change to call it a drop :param data_previous: :param data_latest: :param price: :param threshold: :return boolean: """ this_drop = False quantity_previous = data_previous.available_shares * float(data_latest.current_price) quantity_latest = data_latest.available_shares * float(data_latest.current_price) if data_latest.current_price < price and quantity_latest - quantity_previous > threshold * 1e9: this_drop = True return this_drop def is_npc_drop(data_previous, data_latest): """ A NPC drop is characterized by a spike in available shares :param data_previous: :param data_latest: :return: """ npc_drop = False if data_latest.total_shares - data_previous.totals_shares > 0: print('data_latest.total_shares {}'.format(data_latest.total_shares)) print('data_previous.total_shares {}'.format(data_previous.totals_shares)) npc_drop = True return npc_drop def process_forecast(data_previous, data_latest, max_price, available): forecast_changed = False if data_previous.forecast in ("Very poor", "Poor", "Average"): if data_latest.forecast in ("Good", "Very good"): forecast_changed = True if data_latest.current_price > max_price: forecast_changed = False if data_latest.available_shares <= available: forecast_changed = False return forecast_changed