| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import psycopg2.extras
- def get_timestamp_previous(this_db):
- """
- Retrieve second to last timestamp from the stocks database
- :param this_db:
- :return timestamp:
- """
- cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
- query = """SELECT DISTINCT timestamp FROM stocks ORDER BY stocks.timestamp DESC LIMIT 1 OFFSET 1;"""
- cursor.execute(query)
- res = cursor.fetchone()
- this_timestamp_previous = res.timestamp
- cursor.close()
- return this_timestamp_previous
- def get_timestamp_latest(this_db):
- """
- Retrieve latest timestamp from the stocks database
- :param this_db:
- :return timestamp:
- """
- cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
- query = """SELECT DISTINCT timestamp FROM stocks ORDER BY stocks.timestamp DESC LIMIT 1;"""
- cursor.execute(query)
- res = cursor.fetchone()
- this_timestamp_latest = res.timestamp
- cursor.close()
- return this_timestamp_latest
- def get_timestamp_stored():
- """
- Retrieve the timestamp when the program checked the database
- :return timestamp:
- """
- with open('timestamp.txt', 'r') as f:
- this_timestamp_stored_string = f.read()
- if this_timestamp_stored_string.replace('.', '', 1).isdigit():
- this_timestamp_stored = int(this_timestamp_stored_string)
- else:
- this_timestamp_stored = 0
- return this_timestamp_stored
- def put_timestamp_stored(this_timestamp):
- """
- Update timestamp when the program checked the database
- :param this_timestamp:
- :return:
- """
- with open('timestamp.txt', 'w') as f:
- f.write("{}".format(this_timestamp))
- def get_data(stock_id, this_timestamp, this_db):
- """
- Retrieves detailed data from the existing stocks database
- :param stock_id:
- :param this_timestamp:
- :param this_db:
- :return:
- """
- cursor = this_db.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
- query = """SELECT current_price, available_shares, forecast, total_shares
- FROM stocks
- WHERE stock_id = %s AND timestamp = %s"""
- cursor.execute(query, (stock_id, this_timestamp))
- this_data = cursor.fetchone()
- cursor.close()
- return this_data
- def process_data(data_previous, data_latest, price, threshold):
- """
- Calculate if there is enough change to call it a drop
- :param data_previous:
- :param data_latest:
- :param price:
- :param threshold:
- :return boolean:
- """
- this_drop = False
- quantity_previous = data_previous.available_shares * float(data_latest.current_price)
- quantity_latest = data_latest.available_shares * float(data_latest.current_price)
- if data_latest.current_price < price and quantity_latest - quantity_previous > threshold * 1e9:
- this_drop = True
- return this_drop
- def is_npc_drop(data_previous, data_latest):
- """
- A NPC drop is characterized by a spike in available shares
- :param data_previous:
- :param data_latest:
- :return:
- """
- npc_drop = False
- if data_latest.total_shares - data_previous.total_shares > 0:
- print('data_latest.total_shares {}'.format(data_latest.total_shares))
- print('data_previous.total_shares {}'.format(data_previous.totals_shares))
- npc_drop = True
- return npc_drop
- def process_forecast(data_previous, data_latest, max_price, available):
- forecast_changed = False
- if data_previous.forecast in ("Very poor", "Poor", "Average"):
- if data_latest.forecast in ("Good", "Very good"):
- forecast_changed = True
- if data_latest.current_price > max_price:
- forecast_changed = False
- if data_latest.available_shares <= available:
- forecast_changed = False
- return forecast_changed
|