给定数据集的估计方法排名#
我们通过根据各种估计方法在驳斥测试中的表现对其进行排名,来说明其在给定数据集上的比较,排名考虑了观测到的未建模混杂误差和未观测到的混杂误差。
[ ]:
# Importing all the required libraries
import sys
import argparse
import xgboost
import numpy as np
import pandas as pd
import os
import pdb
import random
from xgboost import XGBRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LassoCV
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import mean_absolute_error
from dowhy import CausalModel
from datetime import datetime
from collections import namedtuple
import statsmodels.api as sm
from sklearn import linear_model
import dowhy
from dowhy.utils import dgp
from dowhy.utils.dgps.linear_dgp import LinearDataGeneratingProcess
from dowhy import CausalModel
from datetime import datetime
from collections import namedtuple
from dowhy.causal_refuters.add_unobserved_common_cause import AddUnobservedCommonCause
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import matplotlib.transforms as mtransforms
# Config dict to set the logging level
import logging.config
DEFAULT_LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
'': {
'level': 'WARN',
},
}
}
logging.config.dictConfig(DEFAULT_LOGGING)
# Disabling warnings output
import warnings
from sklearn.exceptions import DataConversionWarning, ConvergenceWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
warnings.filterwarnings(action='ignore', category=ConvergenceWarning)
[ ]:
def convert_singleton_to_float(arr):
'''Helper function.'''
array = []
if len(arr) == 1 and type(arr[0]) != np.ndarray:
return arr[0]
for element in arr:
while type(element) == np.ndarray or isinstance(element, list) :
if len(element) > 1:
raise ValueError("This script only accepts one value for the refute")
element = element[0]
array.append(element)
return array
[ ]:
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
RESULTSFOLDER = "results/"
ensure_dir(RESULTSFOLDER)
# Create the estimator named tuple to wrap the name and properties
Estimator = namedtuple('Estimator', ['name','params'])
Refuter = namedtuple('Refuter', ['name','params'])
class Experiment():
'''
Class to define the experiment setup to compare a list of estimators across a list of refuters for the given dataset.
'''
def __init__(self, **kwargs):
self.experiment_name = kwargs['experiment_name']
self.experiment_id = kwargs['experiment_id']
self.num_experiments = kwargs['num_experiments']
self.sample_sizes = kwargs['sample_sizes']
self.dgps = kwargs['dgps']
self.estimators = kwargs['estimators']
self.refuters = kwargs['refuters']
self.results = []
self.simulate_unobserved_confounding = kwargs["simulate_unobserved_confounding"]
# Handle input errors in sample_sizes
if isinstance(self.sample_sizes, list) == False:
if type(self.sample_sizes) != int:
raise ValueError('The input to "sample_sizes" should be an int or a list')
else:
self.sample_sizes = [self.sample_sizes]
# Handle input errors in DGPs
if isinstance(self.dgps, list) == False:
if isinstance(self.dgps, DataGeneratingProcess) == False:
raise ValueError('The input to "dgps" should be a list or a subclass of "DataGeneratingProcess"')
else:
self.dgps = [self.dgps]
# Handle inputs errors in estimators
if isinstance(self.estimators, list) == False:
if isinstance(self.estimators, Estimator) == False:
raise ValueError('The input to "estimators" should be a list or an Estimator namedtuple')
else:
self.estimators = [self.estimators]
# Handle input errors in refuters
if isinstance(self.refuters, list) == False:
if isinstance(self.refuters, Refuter) == False:
raise ValueError('The input to "refuters" should be a list of a Refuter namedtuple')
else:
self.refuters = [self.refuters]
def experiment(self):
print("\n\nRunning Experiment:",self.experiment_name + '_' + str(self.experiment_id) )
for exp in range(self.num_experiments):
print("\n\nRunning Experiment Number:",exp)
for sample_size in self.sample_sizes:
print("\n\nCurrent Sample Size:",sample_size)
for dgp in self.dgps:
print("\n\nThe current DGP:")
print(dgp)
estimates = []
estimate_values = []
estimated_effect = []
new_effect = []
p_value = []
data = dgp.generate_data(sample_size)
print("printing data shape")
print(data.values.shape)
print(dgp.true_value)
print("check")
if dgp.treatment_is_binary:
data[dgp.treatment] = data[dgp.treatment].astype(bool)
#k = len(dgp.confounder)-4
#confounder_list = random.sample(dgp.confounder, k)
confounder_list = ['w2','w3']
s = set(confounder_list)
unobserved_confounders = [x for x in dgp.confounder if x not in s]
df_unobserved_confounders = pd.DataFrame(data = data[[c for c in data.columns if c in unobserved_confounders]])
df_unobserved_confounders.to_csv("results/unobserved_confounders.csv")
print("printing length of confounder list:", len(confounder_list))
print("printing confounder list:", confounder_list)
print("data columns")
print("data columns", data.columns)
model = CausalModel(
data = data,
treatment = dgp.treatment,
outcome = dgp.outcome,
common_causes = confounder_list,
effect_modifiers = dgp.effect_modifier
)
model.view_model()
from IPython.display import Image, display
display(Image(filename="causal_model.png"))
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print("identified_estimand:", identified_estimand)
#print("identified_estimand:", identified_estimand)
print("\n\nRunning the estimators:\n")
for estimator in self.estimators:
print("The current estimator:", estimator)
print("estimator.params", estimator.params)
estimate = model.estimate_effect(
identified_estimand,
method_name = estimator.name,
method_params = estimator.params
)
print("printing estimate's type")
print(type(estimate))
estimates.append(estimate)
estimate_values.append(estimate.value)
estimate_values = convert_singleton_to_float(estimate_values)
print("estimate_values", estimate_values)
print("\n\nRunning the refuters:\n")
for refuter in self.refuters:
print("The current refuter:", refuter)
for estimate in estimates:
if self.simulate_unobserved_confounding == True:
print("********%%%%%%%%%$$$$$&&^**^^^^*^*^*")
if refuter.name == 'dummy_outcome_refuter':
add_unobserved_confounder = AddUnobservedCommonCause(data, identified_estimand, estimate)
print("add_unobserved_confounder", add_unobserved_confounder)
unobserved_confounder_values = add_unobserved_confounder.include_simulated_confounder(convergence_threshold = 0.11, c_star_max = 1500)
refuter.params['unobserved_confounder_values'] = unobserved_confounder_values
print('refuter.params', refuter.params)
refute = model.refute_estimate(
identified_estimand,
estimate,
method_name = refuter.name,
**refuter.params,
)
print("printing refute's type")
print(type(refute))
if(refuter.name == 'dummy_outcome_refuter'):
refute = refute[0]
if refute.refutation_result is not None:
p_value.append(refute.refutation_result['p_value'])
else:
p_value.append(None)
estimated_effect.append(refute.estimated_effect)
#print("refute.estimate_effect()", refute.estimate_effect())
new_effect.append(refute.new_effect)
estimated_effect = convert_singleton_to_float(estimated_effect)
new_effect = convert_singleton_to_float(new_effect)
p_value = convert_singleton_to_float(p_value)
true_value = convert_singleton_to_float(dgp.true_value)
print("estimated effect", estimated_effect)
print("new_effect", new_effect)
print("p_value", p_value)
print("true value", true_value)
self.results.append([exp, sample_size, dgp.NAME, *estimate_values, *estimated_effect, *new_effect, *p_value, true_value])
print("\n\nCompleted all experiments. Saving the data...")
COLUMNS = ['EXPERIMENT', 'SAMPLE_SIZE', 'DGP']
RESULT_CATEGORIES = ['ESTIMATED_EFFECT', 'NEW_EFFECT', 'P_VALUE']
estimator_names = [estimator.name for estimator in self.estimators]
refuter_names = [refuter.name for refuter in self.refuters]
for estimator_name in estimator_names:
COLUMNS += ['ORIGINAL_ESTIMATE'+ ':' + estimator_name]
for result_category in RESULT_CATEGORIES:
for refuter_name in refuter_names:
for estimator_name in estimator_names:
COLUMNS += [refuter_name + ':' + estimator_name + ':' + result_category]
COLUMNS += ['TRUE_VALUE']
csv_file = RESULTSFOLDER + self.experiment_name+ '_' + str(self.experiment_id) + '_' + str(datetime.utcnow().date()) + '_data.csv'
onlyres_csv_file = RESULTSFOLDER + "onlyres_"+ self.experiment_name+ '_' + str(self.experiment_id) + '_' + str(datetime.utcnow()) + '_data.csv'
self.results = pd.DataFrame(data=self.results,columns=COLUMNS)
self.results.to_csv(csv_file.replace(" ", ""), index=False)
print("Data has been saved in ",csv_file)
return csv_file
[ ]:
#Defining the Data Generating Process
ldgp = LinearDataGeneratingProcess(treatment=['t1'], outcome=['y'], confounder=['w1','w2', 'w3','w4','w5','w6'], effect_modifier=['x1','x2'], seed=None, treatment_is_binary=True)
#Defining the sample size
sample_size = 1000
dgp_dict = {'ldgp':ldgp}
dgp_list = []
dgp_list.append( dgp_dict['ldgp'] )
# Create a namedtuple to store the name of the estimator and the parameters passed
estimator_list = ["backdoor.linear_regression",
#"backdoor.propensity_score_stratification",
"backdoor.propensity_score_matching",
"backdoor.propensity_score_weighting",
"backdoor.econml.dml.DML",
"backdoor.econml.dr.LinearDRLearner",
#"backdoor.econml.metalearners.TLearner",
#"backdoor.econml.metalearners.XLearner",
#"backdoor.causalml.inference.meta.LRSRegressor",
#"backdoor.causalml.inference.meta.XGBTRegressor",
#"backdoor.causalml.inference.meta.MLPTRegressor",
#"backdoor.causalml.inference.meta.BaseXRegressor"
]
method_params= [ None,
#None,
{ "init_params":{} },
{ "init_params":{} },
{"init_params":{'model_y':GradientBoostingRegressor(),
'model_t': GradientBoostingRegressor(),
"model_final":LassoCV(fit_intercept=False),
'featurizer':PolynomialFeatures(degree=1, include_bias=True)},
"fit_params":{}},
{"init_params":{ 'model_propensity': LogisticRegressionCV(cv=3, solver='lbfgs', multi_class='auto'),
},
"fit_params":{}
},
'''{"init_params": {'models': GradientBoostingRegressor(n_estimators=100, max_depth=6, min_samples_leaf=int(sample_size/100))
},
"fit_params":{}
},
{"init_params":{'models': GradientBoostingRegressor(n_estimators=100, max_depth=6, min_samples_leaf=int(sample_size/100)),
'propensity_model': RandomForestClassifier(n_estimators=100, max_depth=6,
min_samples_leaf=int(sample_size/100))
},
"fit_params":{}
},
{"init_params":{},},
{"init_params":{
'learner':XGBRegressor()
}
}'''
]
estimator_tuples = []
refuter_tuples = []
refuter_list = ['dummy_outcome_refuter']
refuter_params = [{'num_simulations':5,'transformation_list': [('random_forest',{'n_estimators':100, 'max_depth':6})], 'true_causal_effect':(lambda x:0.5)}]
# Iterate through the names and parameters to create a list of namedtuples
for name, param in zip(estimator_list,method_params):
estimator_tuples.append(Estimator._make([name, param]))
for name, param in zip(refuter_list, refuter_params):
refuter_tuples.append(Refuter._make([name, param]))
[ ]:
def plot_MAEs(res):
true_value_column = res.columns[-1]
estimate_columns=res.columns[3:-1]
#print(estimate_columns)
#print(type(estimate_columns))
estimate_columns.append(pd.Index(res["TRUE_VALUE"]))
#print(estimate_columns)
fig, ax = plt.subplots()
MAE ={}
for colname in estimate_columns:
if colname not in ('ORIGINAL_ESTIMATE:backdoor.propensity_score_weighting',):
#'ORIGINAL_ESTIMATE:backdoor.econml.metalearners.TLearner'):
plt.plot(res[colname], res["TRUE_VALUE"], marker='o', linestyle="None", label=colname)
"Mean Absolute Error (MAE): {}".format(mean_absolute_error(res[colname], res["TRUE_VALUE"]))
MAE[colname] = mean_absolute_error(res[colname], res["TRUE_VALUE"])
fig.suptitle('Calibration plot showing the accuracy of different causal estimators [P(T=1)=0.9]')
ax.set_xlabel('Estimated effect')
ax.set_ylabel('True causal effect')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.20),
fancybox=True, shadow=True, ncol=2)
plt.show()
print("Printing MAE of various estimates: ")
MAE_values = {k: v for k, v in sorted(MAE.items(), key=lambda item: item[1], reverse = True)}
for k,v in MAE_values.items():
print(k, v)
[ ]:
def plot_estimators_and_refuters(refuter, estimator):
x = list(res['EXPERIMENT'])
y1 = list(res[refuter+':'+estimator+':ESTIMATED_EFFECT'])
y2 = list(res[refuter+':'+estimator+':NEW_EFFECT'])
#print(res['TRUE_VALUE'])
y3 = list(res['TRUE_VALUE'])
y4 = list(res[refuter+':'+estimator+':P_VALUE'])
plt.scatter(x, y1, c ="blue", label = "Estimated Effect")
plt.scatter(x, y2, c ="red", label = "New Effect")
plt.scatter(x, y3, c ="green", label = "True Value")
plt.scatter(x, y4, c ="yellow", label = "P Value")
plt.xlabel("EXPERIMENT")
plt.ylabel("EFFECT")
legend = plt.legend(loc=4, fontsize='small', fancybox=True)
plt.title(estimator)
plt.show()
plt.savefig(estimator+'.png')
[ ]:
def plot_deviations(estimator_list, deviation_list):
plt.scatter(estimator_list, deviation_list)
plt.xticks(estimator_list, estimator_list, rotation='vertical')
plt.show()
观测到的未建模混杂误差#
对于每个估计器,我们使用虚拟结果驳斥器来检查每个估计器的观测到的未建模混杂误差。也就是说,我们仅在观测到的混杂因素上运行每个估计器的驳斥测试,并分析在观测变量中存在多少未建模的混杂误差。
[ ]:
# Define the properties of the experiment
# The name of the experiment
# The experiment ID
# The number of experiments to be run with the SAME parameters
# The size of the samples to be run
# The list of DGPs to be run
# The list of estimators
observed_confounding_error = Experiment(
experiment_name='Test',
experiment_id='1',
num_experiments=10, # 10
sample_sizes=sample_size,
dgps=dgp_list,
estimators=estimator_tuples,
refuters=refuter_tuples,
simulate_unobserved_confounding = False
)
# Run the experiment
res = pd.read_csv(observed_confounding_error.experiment())
[ ]:
#PLOT
#This plot shows the Mean Absolute Error of the Orginal Estimate from the true value and of the New Effect from
#the expected value for each estimator.
plot_MAEs(res)
基于原始估计值的排名#
原始估计值是在存在真实值(即地面真相)的情况下计算的。然而,在许多现实生活中数据集中,地面真相可能未知。因此,我们希望由驳斥测试产生的排名与从原始估计值获得的排名一致。根据原始估计值,估计器的排名应如下(MAE 最小的方法应获得最佳排名):1. DMLCateEstimator 2. LinearRegression 3. LinearDRLearner 4. Propensity Score Matching
[ ]:
estimator_list = ["backdoor.linear_regression",
#"backdoor.propensity_score_stratification",
"backdoor.propensity_score_matching",
"backdoor.econml.dml.DML",
"backdoor.econml.dr.LinearDRLearner",
#"backdoor.econml.metalearners.TLearner",
#"backdoor.econml.metalearners.XLearner",
#"backdoor.causalml.inference.meta.LRSRegressor",
#"backdoor.causalml.inference.meta.XGBTRegressor",
#"backdoor.causalml.inference.meta.MLPTRegressor",
#"backdoor.causalml.inference.meta.BaseXRegressor"
]
[ ]:
#This plot shows the deviation of the original estimate, the new effect and the estimated effect from the true value
refuter = 'dummy_outcome_refuter'
deviation_list = []
for estimator in estimator_list:
plot_estimators_and_refuters(refuter, estimator)
avg_deviation = ((res[refuter+':'+estimator+':NEW_EFFECT']).sum(axis=0))
print(avg_deviation)
deviation_list.append(avg_deviation)
[ ]:
plot_deviations(estimator_list, deviation_list)
for i in range(len(estimator_list)):
print(estimator_list[i] +": "+ str(deviation_list[i]))
[ ]:
{k: v for k, v in sorted(zip(estimator_list, deviation_list), key=lambda item: item[1], reverse = True)}
基于新效应(驳斥结果)的排名#
基于偏差绝对值的排名是:1. Propensity Score Matching 2. Linear DR Learner 3. DML CATE Estimator 4. Linear Regression
显然,观测到的未建模混杂误差无法与基于原始估计值的排名相匹配。它甚至无法判断根据真实值,这些方法中明显的赢家是 DML CATE Estimator
未观测到的混杂误差#
对于每个估计器,我们现在模拟未观测到的混杂因素,并使用虚拟结果驳斥器检查其效应,以检查每个估计器的未观测到的混杂误差。也就是说,我们不仅在观测到的混杂因素上运行每个估计器的驳斥测试,还在我们使用 AddUnobservedCommonCause 类模拟的未观测到的混杂因素上运行,并分析是否存在一个强烈的未观测(缺失)且需要考虑的混杂因素。
[ ]:
unobserved_confounding_error = Experiment(
experiment_name='Test',
experiment_id='2',
num_experiments=10, # 10
sample_sizes=sample_size,
dgps=dgp_list,
estimators=estimator_tuples,
refuters=refuter_tuples,
simulate_unobserved_confounding = True
)
# Run the experiment
res = pd.read_csv(unobserved_confounding_error.experiment())
[ ]:
##This plot shows the Mean Absolute Error of the Orginal Estimate from the true value and of the New Effect from
#the expected value for each estimator.
plot_MAEs(res)
基于原始估计值的排名#
原始估计值是在存在真实值(即地面真相)的情况下计算的。然而,在许多现实生活中数据集中,地面真相可能未知。因此,我们希望由驳斥测试产生的排名与从原始估计值获得的排名一致。根据原始估计值,估计器的排名应如下(MAE 最小的方法应获得最佳排名):1. DMLCateEstimator 2. Propensity Score Matching 3. LinearRegression 4. LinearDRLearner
[ ]:
#This plot shows the deviation of the original estimate, the new effect and the estimated effect from the true value
refuter = 'dummy_outcome_refuter'
deviation_list = []
for estimator in estimator_list:
plot_estimators_and_refuters(refuter, estimator)
avg_deviation = ((res[refuter+':'+estimator+':NEW_EFFECT']).sum(axis=0))
print(avg_deviation)
deviation_list.append(avg_deviation)
[ ]:
plot_deviations(estimator_list, deviation_list)
for i in range(len(estimator_list)):
print(estimator_list[i] +": "+ str(deviation_list[i]))
[ ]:
{k: v for k, v in sorted(zip(estimator_list, deviation_list), key=lambda item: item[1], reverse = True)}
基于新效应(驳斥结果)的排名#
基于偏差绝对值的排名是:1. DML 2. Linear DR Learner 3. Propensity Score Matching 4. Linear Regression