ttaxt/example-filltable.py

302 lines
12 KiB
Python

# Re-running the script due to the environment reset
import requests
import pandas as pd
import pyvat
import sqlite3
import pyarrow
from SPARQLWrapper import SPARQLWrapper, JSON
import os
from datetime import date
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Database setup
db_path = 'ttaxt.db'
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
logging.info("Database connection established")
# Create necessary tables
# Reverse charge values (as defined in pyvat):
# 1 - Charge VAT
# 2 - Reverse charge
# 3 - No charge
cursor.executescript('''
CREATE TABLE IF NOT EXISTS countries (
country_code TEXT PRIMARY KEY,
country_name TEXT,
continent TEXT,
currency TEXT
);
CREATE TABLE IF NOT EXISTS vat_rules (
country_code TEXT,
is_registered INTEGER,
vat_rate REAL DEFAULT NULL,
vat_action INTEGER DEFAULT NULL,
PRIMARY KEY (country_code, is_registered)
);
CREATE TABLE IF NOT EXISTS us_sales_tax_compliance (
state_code TEXT PRIMARY KEY,
sales_threshold REAL,
transaction_threshold INTEGER,
sales_tax_rate REAL,
threshold_reached BOOLEAN,
sales_block BOOLEAN,
total_sales REAL,
total_transactions INTEGER
);
''')
logging.info("Tables created successfully")
# Function to fetch country and currency data from Wikidata using SPARQL
def fetch_wikidata_countries_and_currency():
parquet_file = 'wikidata_countries_currency.parquet'
if os.path.exists(parquet_file):
# Load data from parquet file if it exists
return pd.read_parquet(parquet_file)
else:
sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
sparql.setQuery("""
SELECT ?countryLabel ?iso_alpha_2 ?currency_code ?continentLabel WHERE {
?country wdt:P31 wd:Q6256; # Select countries
wdt:P297 ?iso_alpha_2; # Get ISO 3166-1 alpha-2 codes
wdt:P38 ?currency; # Get currency
wdt:P30 ?continent. # Get continent
?currency wdt:P498 ?currency_code. # Get ISO 4217 currency code
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
""")
sparql.setReturnFormat(JSON)
results = sparql.query().convert()
# Process the data
data = []
for result in results["results"]["bindings"]:
country_name = result["countryLabel"]["value"]
iso_alpha_2 = result["iso_alpha_2"]["value"]
currency_code = result["currency_code"]["value"]
continent = result["continentLabel"]["value"]
data.append((iso_alpha_2, country_name, continent, currency_code))
logging.info(f"Appended data: {iso_alpha_2}, {country_name}, {continent}, {currency_code}")
df = pd.DataFrame(data, columns=['Country Code', 'Country Name', 'Continent', 'Currency'])
logging.info(f"Created DataFrame with {len(df)} rows")
# Save the data to a parquet file
df.to_parquet(parquet_file)
logging.info(f"Saved DataFrame to parquet file: {parquet_file}")
return df
# Set initial seller
seller = pyvat.Party('NL', is_business=True) # Assuming the seller is a business in the Netherlands
# 1. Fetch country, currency, and continent data using SPARQL
logging.info("Fetching country data from Wikidata")
country_data_df = fetch_wikidata_countries_and_currency()
logging.info(f"Fetched {len(country_data_df)} countries from Wikidata")
logging.info(f"First few rows of country data:\n{country_data_df.head()}")
# 2. Download and cross-check with ISO 4217 XLS
iso_4217_url = "https://www.six-group.com/dam/download/financial-information/data-center/iso-currrency/lists/list-one.xls"
local_file_path = "iso_4217_list_one.xls"
# Download the file if it doesn't exist
if not os.path.exists(local_file_path):
logging.info(f"Downloading ISO 4217 XLS file from {iso_4217_url}")
response = requests.get(iso_4217_url)
with open(local_file_path, 'wb') as file:
file.write(response.content)
logging.info("ISO 4217 XLS file downloaded successfully")
else:
logging.info("ISO 4217 XLS file already exists")
# Read the Excel file, starting from row 4 (0-based index, so 3)
iso_4217_xls = pd.read_excel(local_file_path, sheet_name=0, header=3)
iso_4217_valid_codes = iso_4217_xls['Alphabetic Code'].dropna().tolist()
logging.info(f"Found {len(iso_4217_valid_codes)} valid currency codes in ISO 4217 XLS")
# Filter out defunct currency codes
valid_countries_df = country_data_df[country_data_df['Currency'].isin(iso_4217_valid_codes)]
logging.info(f"Filtered to {len(valid_countries_df)} countries with valid currencies")
logging.info(f"First few rows of valid countries:\n{valid_countries_df.head()}")
# Populate the countries table with filtered data
logging.info("Populating countries table")
for index, row in valid_countries_df.iterrows():
cursor.execute('''
INSERT OR REPLACE INTO countries (country_code, country_name, continent, currency)
VALUES (?, ?, ?, ?)
''', (row['Country Code'], row['Country Name'], row['Continent'], row['Currency']))
logging.info(f"Inserted country: {row['Country Name']} ({row['Country Code']}) into the database")
conn.commit()
logging.info("Countries table populated successfully")
# 3. Populate VAT rules using pyvat
logging.info("Populating VAT rules")
current_date = date.today()
item_type = pyvat.ItemType.generic_electronic_service
business_types = [True, False]
total_countries = len(valid_countries_df)
for business_type in business_types:
logging.info(f"Processing VAT rules for {'business' if business_type else 'non-business'} customers")
for index, row in valid_countries_df.iterrows():
country_code = row['Country Code']
country_name = row['Country Name']
buyer = pyvat.Party(country_code, is_business=business_type)
vat_charge = pyvat.get_sale_vat_charge(current_date, item_type, buyer, seller)
vat_rate = float(vat_charge.rate) if vat_charge.rate is not None else None
is_registered = 1 if business_type else 0 # Convert boolean to integer
vat_action = vat_charge.action.value if vat_charge.action is not None else None
cursor.execute('''
INSERT OR REPLACE INTO vat_rules (country_code, is_registered, vat_rate, vat_action)
VALUES (?, ?, ?, ?)
''', (country_code, is_registered, vat_rate, vat_action))
logging.info(f"Processed VAT rules for {country_name} ({country_code}): VAT rate = {vat_rate}, is_registered = {is_registered}, VAT action = {vat_action}")
conn.commit()
logging.info(f"VAT rules for {'business' if business_type else 'non-business'} customers committed to database")
# print table
logging.info("Fetching all VAT rules from database")
cursor.execute('''
SELECT * FROM vat_rules
''')
vat_rules = cursor.fetchall()
logging.info(f"Total VAT rules: {len(vat_rules)}")
logging.info(f"First few VAT rules:\n{vat_rules[:5]}")
# Commit changes and close the database
conn.commit()
conn.close()
logging.info("Database connection closed")
# Let's download the U.S. sales tax compliance data from the web
# From: https://taxfoundation.org/wp-content/uploads/2024/07/LOST_July_2024_Rate_Table.xlsx
# Define file paths
lost_file_path = "LOST_July_2024_Rate_Table.xlsx"
# This is data manually gathered from the sources indicated in the file to crosscheck facts
thresholds_file_path = "state_sales_tax_thresholds.xlsx"
# Check if files exist, download if they don't
if not os.path.exists(lost_file_path):
logging.info("Downloading U.S. sales tax compliance data")
url = "https://taxfoundation.org/wp-content/uploads/2024/07/LOST_July_2024_Rate_Table.xlsx"
response = requests.get(url)
if response.status_code == 200:
with open(lost_file_path, "wb") as file:
file.write(response.content)
logging.info("U.S. sales tax data downloaded successfully")
else:
logging.error(f"Failed to download U.S. sales tax data. Status code: {response.status_code}")
raise Exception("Failed to download U.S. sales tax data")
else:
logging.info("U.S. sales tax data file already exists, using existing file")
if not os.path.exists(thresholds_file_path):
logging.error("State sales tax thresholds file not found")
raise FileNotFoundError("state_sales_tax_thresholds.xlsx is missing")
else:
logging.info("Using existing state sales tax thresholds file")
# Read the Excel files
logging.info("Reading U.S. sales tax and threshold data from Excel files")
lost_df = pd.read_excel(lost_file_path, sheet_name="Sheet1")
thresholds_df = pd.read_excel(thresholds_file_path, sheet_name=0) # Explicitly read the first sheet
def clean_state_name(state_name):
if pd.isna(state_name):
return pd.NA
return str(state_name).split('(')[0].strip()
# Clean state names in both dataframes
lost_df['State'] = lost_df['State'].apply(clean_state_name)
thresholds_df['State Name'] = thresholds_df['State Name'].apply(clean_state_name)
# Process and insert data into us_sales_tax_compliance table
logging.info("Processing and inserting U.S. sales tax and threshold data")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for _, threshold_row in thresholds_df.iterrows():
state_name = threshold_row['State Name']
state_code = threshold_row['State Code']
sales_threshold = threshold_row['Threshold']
transaction_threshold = threshold_row['Transactions']
# Skip rows where state_name is NA or null
if pd.isna(state_name):
logging.info(f"Skipping row with NA state name in thresholds data")
continue
logging.info(f"Processing state: {state_name}")
# Find corresponding LOST data
lost_row = lost_df[lost_df['State'] == state_name]
if not lost_row.empty:
combined_rate = lost_row['State Tax Rate'].values[0]
logging.info(f"LOST data found - Combined rate: {combined_rate}")
else:
combined_rate = None
logging.warning(f"No LOST data found for {state_name}")
# Convert 'None' to None for SQLite
sales_threshold = None if pd.isna(sales_threshold) else sales_threshold
transaction_threshold = None if pd.isna(transaction_threshold) else transaction_threshold
combined_rate = None if pd.isna(combined_rate) else combined_rate
# Remove '$' and ',' from sales_threshold if it's not None
if sales_threshold is not None:
sales_threshold = float(str(sales_threshold).replace('$', '').replace(',', ''))
logging.info(f"Inserting data for {state_name}: State Code: {state_code}, Sales threshold: {sales_threshold}, Transaction threshold: {transaction_threshold}, Combined rate: {combined_rate}")
cursor.execute('''
INSERT OR REPLACE INTO us_sales_tax_compliance
(state_code, sales_threshold, transaction_threshold, sales_tax_rate, threshold_reached, sales_block, total_sales, total_transactions)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (state_code, sales_threshold, transaction_threshold, combined_rate, False, False, 0, 0))
logging.info(f"Data inserted for {state_name}")
conn.commit()
logging.info("U.S. sales tax compliance data inserted successfully")
# Check for any states in LOST data that weren't in thresholds data
lost_states = set(lost_df['State'].dropna())
threshold_states = set(thresholds_df['State Name'].dropna())
missing_states = lost_states - threshold_states
if missing_states:
logging.warning(f"States in LOST data but not in thresholds data: {missing_states}")
# Print table
logging.info("Fetching all U.S. sales tax compliance data from database")
cursor.execute('''
SELECT * FROM us_sales_tax_compliance
''')
us_tax_data = cursor.fetchall()
logging.info(f"Total U.S. sales tax entries: {len(us_tax_data)}")
logging.info(f"First few U.S. sales tax entries:\n{us_tax_data[:5]}")
conn.close()
logging.info("Database connection closed")
logging.info(f"Excel files kept at: {lost_file_path} and {thresholds_file_path}")
# Provide the path to the created SQLite database
logging.info(f"SQLite database created at: {db_path}")
db_path