Project 5 : Collect trading data using Yahoo finance API and use online regression to predict markets stocks of Google, Facebook & Amazon.
Option 2 : For each of these 5 countries, use 1 major industry stock data For ex, in US Google, in France BNP Paribas, in China Alibaba, in Russia or England, use a major international industry. This option was initially given in the project.
For each option, each group should use at least 3 different data streams, with online and adaptive regression on RIVER (such as https://riverml.xyz/latest/api/tree/HoeffdingAdaptiveTreeRegressor/) and compare the performances with batch regression model (scikit-learn).
ToDo: Compare online Regression vs Batch Regression and discuss the performance.
Bonus : Use recent stock market data (from January to March 2022).
Online resources: You can use the Python library to collect Yahoo Finance data in streaming https://pypi.org/project/yfinance/ You can compute time-series statistics and moving averages (MACD) for features engineering https://www.statsmodels.org/stable/tsa.html
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
%load_ext autoreload
%autoreload 2
%matplotlib inline
The autoreload extension is already loaded. To reload it, use: %reload_ext autoreload
!pip3 install --quiet -r requirements.txt
import sys
print("Python version: {}". format(sys.version))
import IPython
from IPython import display
from IPython.display import Markdown, display, clear_output
print("IPython version: {}". format(IPython.__version__))
import numpy as np
print("NumPy version: {}". format(np.__version__))
import scipy as sp
from scipy import stats
print("SciPy version: {}". format(sp.__version__))
import pandas as pd
print("pandas version: {}". format(pd.__version__))
import matplotlib
import matplotlib.pyplot as plt
print("matplotlib version: {}". format(matplotlib.__version__))
import seaborn as sns
print("seaborn version : {}". format(sns.__version__))
sns.set()
import kafka
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka import KafkaConsumer
print("kafka version : {}". format(kafka.__version__))
import yfinance as yf
print("yfinance version : {}". format(yf.__version__))
import statsmodels
print("statsmodels version : {}". format(statsmodels.__version__))
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller, kpss
import sklearn
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error, r2_score
print("scikit-learn version : {}". format(sklearn.__version__))
import river
from river import base
from river import compose
from river import metrics
from river import preprocessing
from river import evaluate
from river import tree
from river import ensemble
from river import linear_model
from river import stats
from river import optim
print("river version : {}". format(river.__version__))
import ipywidgets as widgets
print("ipywidgets version : {}". format(widgets.__version__))
import time
from time import perf_counter
from datetime import datetime, date, timedelta
from enum import Enum, auto
import math
import pprint
import json
import urllib.request
import warnings
warnings.filterwarnings('ignore')
print("====================================")
Python version: 3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)] IPython version: 7.27.0 NumPy version: 1.20.3 SciPy version: 1.7.1 pandas version: 1.3.2 matplotlib version: 3.4.2 seaborn version : 0.11.2 kafka version : 2.0.2 yfinance version : 0.1.70 statsmodels version : 0.13.2 scikit-learn version : 1.0.1 river version : 0.9.0 ipywidgets version : 7.6.5 ====================================
def printmd(text, couleur=None):
"""
Printing of the string text by choosing the style (for example bold) and the color (couleur)
"""
colorstr = "<span style='color:{}'>{}</span>".format(couleur, text)
display(Markdown(colorstr))
def plot_the_results(dates, y_trues, y_preds, company):
fig, ax = plt.subplots(figsize=(15, 8))
ax.grid(alpha=0.75)
ax.plot(dates, y_trues, lw=3, color='#2ecc71', alpha=0.8, label='True value')
ax.plot(dates, y_preds, lw=3, color='#e74c3c', alpha=0.8, label='Prediction')
ax.legend()
ax.set_title("On line prediction of the closing price for the {} stock".format(company))
plt.show()
def plot_the_accuracy(dates, CPU, MAE, CPU_Batch, MAE_Batch, company):
plt.figure(figsize=(15, 8))
plt.subplot(1, 2, 1)
date_batch = [dates[0], dates[len(dates)-1]]
val_cpu_batch = [CPU_Batch, CPU_Batch]
val_mae_batch = [MAE_Batch, MAE_Batch]
plt.plot(dates, CPU, label='RIVER')
plt.plot(date_batch, val_cpu_batch, label='ARIMA')
plt.legend(loc='lower right')
plt.title('CPU Time for Batch and On-Line training ({})'.format(company))
plt.subplot(1, 2, 2)
plt.plot(dates, MAE, label='RIVER')
plt.plot(date_batch, val_mae_batch, label='ARIMA')
plt.legend(loc='upper right')
plt.title('MAE for Batch and On-Line training ({})'.format(company))
plt.show()
LAUNCHING ZOOKEEPER AND KAFKA SERVER ON WINDOWS
In a first terminal, run the following commands:
cd %KAFKA_DIR%
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
In a second terminal, run the following commands:
cd %KAFKA_DIR%
.\bin\windows\kafka-server-start.bat .\config\server.properties
--
we assume that Zookeeper is running default on localhost:2181 and Kafka on localhost:9092.
We create a Kafka topic for each company and stream the stock data retrieved with yfinance into these topics.
In another notebook, we will retrieve this stock market data to apply a Machine Learning model with RIVER.
In the last notebook, we plot the prediction results from the data stored in the Kafka predicts topics
All_the_companies = ["google",
"facebook",
"amazon",
"total",
"gazprom",
"alibaba",
"bnp_paribas",
"BP",
"ferrari"]
trace_plotting = True
Nb_to_update = 500
sommeil = 2
pp = pprint.PrettyPrinter()
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id="Project_DataStream")
topic_name = {}
topic_predict_name = {}
for company in All_the_companies:
topic_name[company] = company
topic_predict_name[company] = "predict__{}".format(company)
# The stock data
if topic_name[company] not in admin_client.list_topics():
topic_list = []
topic_list.append(NewTopic(name=topic_name[company], num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
# The predict results
if topic_predict_name[company] not in admin_client.list_topics():
topic_list = []
topic_list.append(NewTopic(name=topic_predict_name[company], num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
No_Company = 7 # value from 0 to 8
The_selected_company = All_the_companies[No_Company]
print("The selected company is {}".format(The_selected_company))
The selected company is BP
def plot_result(CPU_Batch, MAE_Batch, The_selected_company):
if trace_plotting:
print("----- PLOTTING THE RESULTS FOR {} ----".format(The_selected_company))
topic_predict_name = "predict__{}".format(The_selected_company)
consumer_group_name = "plot_results"
consumer = KafkaConsumer(topic_predict_name,
bootstrap_servers='localhost:9092',
group_id=consumer_group_name)
the_results = {}
nb_msg = 0
dates = []
y_true = []
y_pred = []
CPU = []
MAE = []
try:
# Infinite loop : the consumer is waiting for message
for message in consumer:
nb_msg += 1
the_results = json.loads(message.value.decode())
dates.append(datetime.fromtimestamp(int(the_results["date"] / 1000)))
y_true.append(the_results["y_true"])
y_pred.append(the_results["y_pred"])
CPU.append(the_results["CPU_time"])
MAE.append(the_results["MAE"])
if nb_msg % Nb_to_update == 0:
if trace_plotting:
print("update the plots ({} points)".format(nb_msg))
clear_output(wait=True)
time.sleep(sommeil)
plot_the_results(dates, y_true, y_pred, The_selected_company)
plot_the_accuracy(dates, CPU, MAE, CPU_Batch, MAE_Batch, The_selected_company)
time.sleep(sommeil)
except KeyboardInterrupt:
if trace_plotting:
clear_output(wait=True)
time.sleep(sommeil)
plot_the_results(dates, y_true, y_pred, The_selected_company)
plot_the_accuracy(dates, CPU, MAE, CPU_Batch, MAE_Batch, The_selected_company)
time.sleep(sommeil)
print("update the plots ({} points)".format(nb_msg))
print ("\n------- END OF PLOTTING THE RESULTS -------")
except Exception as e:
print("An arror has occurred")
print(e)
# Get the Batch results
df_Batch_results = pd.read_csv("ARMA_performance.csv", sep=";", decimal=",", index_col=0)
df_Batch_results
| CPU_time | MAE | date | Regression | Company | |
|---|---|---|---|---|---|
| 0 | 0.798112 | 90.471753 | 1648719422000 | Batch | |
| 1 | 0.248773 | 11.778923 | 1648719422000 | Batch | |
| 2 | 0.419485 | 230.395238 | 1648719422000 | Batch | amazon |
| 3 | 0.182358 | 0.928441 | 1648719422000 | Batch | total |
| 4 | 0.462654 | 2.624916 | 1648719422000 | Batch | gazprom |
| 5 | 0.222039 | 12.128438 | 1648719422000 | Batch | alibaba |
| 6 | 0.784485 | 0.842633 | 1648719422000 | Batch | bnp_paribas |
| 7 | 0.696678 | 1.187801 | 1648719422000 | Batch | BP |
| 8 | 0.157458 | 9.012090 | 1648719422000 | Batch | ferrari |
Condition = df_Batch_results["Company"] == The_selected_company
CPU_Batch = df_Batch_results[Condition]["CPU_time"].values[0]
MAE_Batch = df_Batch_results[Condition]["MAE"].values[0]
plot_result(CPU_Batch, MAE_Batch, The_selected_company)
update the plots (2187 points) ------- END OF PLOTTING THE RESULTS -------