csv parser
original code:
countries=("country,latitude,longitude,country_name",
"AD,42.546245,1.601554,Andorra",
"AE,23.424076,53.847818,United Arab Emirates",
"AF,33.93911,67.709953,Afghanistan",
"AG,17.060816,-61.796428,Antigua and Barbuda")
def read_file(string_input):
"""string_input = FROM countries.csv"""
lst = string_input.split(" ")
if lst[0] == "FROM" and lst[1] == "countries.csv":
print(countries)
# read_file("FROM countries.csv")
def select(string_input):
lst = string_input.split(" ")
selected_fields_tmp = lst[3]
selected_fields = selected_fields_tmp.split(",")
dic={
"country":0,
"latitude":1,
"longitude":2,
"country_name":3
}
lst_index= []
for field in selected_fields:
index = dic[field]
lst_index.append(index)
for i in range(1, len(countries)):
data = countries[i]
lst_data = data.split(",")
tmp = ""
for i in lst_index:
tmp+= lst_data[i]
tmp+=","
print(tmp[:-1])
# select("FROM countries.csv SELECT country,country_name,latitude")
def take(string_input):
"""sample string input = FROM countries.csv TAKE 2"""
lst = string_input.split(" ")
take_count = lst[-1]
take_keyword = lst[-2]
if len(countries)-1 <int(take_count):
print("not enough data")
return
if take_keyword == "TAKE":
for i in range(1, int(take_count)+1):
print(countries[i])
# take("FROM countries.csv TAKE 2")
def select_take(string_input):
selected_fields, take_count = handle_user_input(string_input)
dic={
"country":0,
"latitude":1,
"longitude":2,
"country_name":3
}
lst_index = select_fields_via_index(dic, selected_fields)
until_index = take_data_count(take_count)
print_take_count_data(lst_index, until_index)
def print_take_count_data(lst_index, until_index):
for i in range(1, until_index):
data = countries[i]
lst_data = data.split(",")
tmp = ""
for i in lst_index:
tmp += lst_data[i]
tmp += ","
print(tmp[:-1])
def take_data_count(take_count):
until_index = 0
if take_count is not None:
until_index = int(take_count) + 1
else:
until_index = len(countries)
return until_index
def select_fields_via_index(dic, selected_fields):
lst_index = []
for field in selected_fields:
index = dic[field]
lst_index.append(index)
return lst_index
def handle_user_input(string_input):
lst = string_input.split(" ")
selected_fields_tmp = lst[3]
if len(string_input)>4:
take_count = lst[-1]
take_keyword = lst[-2]
selected_fields = selected_fields_tmp.split(",")
if take_count and take_keyword:
return selected_fields, take_count
else:
return selected_fields
select_take("FROM countries.csv SELECT country,country_name,latitude TAKE 2")
refactored code
class CSVQueryParser:
"""A simple SQL-like query parser for CSV data."""
def __init__(self, data):
"""Initialize with CSV data as list of strings."""
self.data = data
self.headers = self._parse_headers()
self.field_indices = self._create_field_mapping()
def _parse_headers(self):
"""Extract headers from the first row."""
return self.data[0].split(",")
def _create_field_mapping(self):
"""Create a mapping of field names to their indices."""
return {header: idx for idx, header in enumerate(self.headers)}
def _parse_query(self, query):
"""Parse the query string and extract components."""
parts = query.split()
# Find key components
from_idx = parts.index("FROM") if "FROM" in parts else -1
select_idx = parts.index("SELECT") if "SELECT" in parts else -1
take_idx = parts.index("TAKE") if "TAKE" in parts else -1
# Extract table name (not used in this simple implementation)
table_name = parts[from_idx + 1] if from_idx != -1 and from_idx + 1 < len(parts) else None
# Extract selected fields
selected_fields = []
if select_idx != -1 and select_idx + 1 < len(parts):
fields_str = parts[select_idx + 1]
selected_fields = fields_str.split(",")
# Extract limit count
limit_count = None
if take_idx != -1 and take_idx + 1 < len(parts):
limit_count = int(parts[take_idx + 1])
return {
'table': table_name,
'fields': selected_fields,
'limit': limit_count
}
def _get_field_indices(self, field_names):
"""Get indices for the specified field names."""
indices = []
for field in field_names:
if field in self.field_indices:
indices.append(self.field_indices[field])
else:
raise ValueError(f"Field '{field}' not found in data")
return indices
def _format_row(self, row_data, field_indices):
"""Format a single row based on selected field indices."""
row_parts = row_data.split(",")
selected_values = [row_parts[idx] for idx in field_indices]
return ",".join(selected_values)
def execute_query(self, query):
"""Execute a query and return the results."""
parsed = self._parse_query(query)
# Determine which fields to select
if parsed['fields']:
field_indices = self._get_field_indices(parsed['fields'])
else:
field_indices = list(range(len(self.headers)))
# Determine how many rows to return
data_rows = self.data[1:] # Skip header row
if parsed['limit']:
if parsed['limit'] > len(data_rows):
raise ValueError("Not enough data for the requested limit")
data_rows = data_rows[:parsed['limit']]
# Format and return results
results = []
for row in data_rows:
formatted_row = self._format_row(row, field_indices)
results.append(formatted_row)
return results
def print_query_results(self, query):
"""Execute query and print results to console."""
try:
results = self.execute_query(query)
for result in results:
print(result)
except ValueError as e:
print(f"Error: {e}")
# Example usage
def main():
# Sample data
countries_data = [
"country,latitude,longitude,country_name",
"AD,42.546245,1.601554,Andorra",
"AE,23.424076,53.847818,United Arab Emirates",
"AF,33.93911,67.709953,Afghanistan",
"AG,17.060816,-61.796428,Antigua and Barbuda"
]
# Create parser instance
parser = CSVQueryParser(countries_data)
# Execute various queries
print("=== Query 1: SELECT country,country_name,latitude TAKE 2 ===")
parser.print_query_results("FROM countries.csv SELECT country,country_name,latitude TAKE 2")
print("\n=== Query 2: SELECT country,country_name ===")
parser.print_query_results("FROM countries.csv SELECT country,country_name")
print("\n=== Query 3: TAKE 2 (all fields) ===")
parser.print_query_results("FROM countries.csv TAKE 2")
if __name__ == "__main__":
main()