Air

whitehousechef·2025년 7월 21일

csv parser
original code:

countries=("country,latitude,longitude,country_name",
           "AD,42.546245,1.601554,Andorra",
           "AE,23.424076,53.847818,United Arab Emirates",
            "AF,33.93911,67.709953,Afghanistan",
            "AG,17.060816,-61.796428,Antigua and Barbuda")

def read_file(string_input):
    """string_input = FROM countries.csv"""
    lst = string_input.split(" ")
    if lst[0] == "FROM" and lst[1] == "countries.csv":
        print(countries)

# read_file("FROM countries.csv")

def select(string_input):
    lst = string_input.split(" ")
    selected_fields_tmp = lst[3]
    selected_fields = selected_fields_tmp.split(",")
    dic={
        "country":0,
        "latitude":1,
        "longitude":2,
        "country_name":3
    }
    lst_index= []
    for field in selected_fields:
        index = dic[field]
        lst_index.append(index)
    for i in range(1, len(countries)):
        data = countries[i]
        lst_data = data.split(",")
        tmp = ""
        for i in lst_index:
            tmp+= lst_data[i]
            tmp+=","
        print(tmp[:-1])


# select("FROM countries.csv SELECT country,country_name,latitude")

def take(string_input):
    """sample string input = FROM countries.csv TAKE 2"""
    lst = string_input.split(" ")
    take_count = lst[-1]
    take_keyword = lst[-2]

    if len(countries)-1 <int(take_count):
        print("not enough data")
        return

    if take_keyword == "TAKE":
        for i in range(1, int(take_count)+1):
            print(countries[i])

# take("FROM countries.csv TAKE 2")

def select_take(string_input):
    selected_fields, take_count = handle_user_input(string_input)

    dic={
        "country":0,
        "latitude":1,
        "longitude":2,
        "country_name":3
    }

    lst_index = select_fields_via_index(dic, selected_fields)

    until_index = take_data_count(take_count)

    print_take_count_data(lst_index, until_index)


def print_take_count_data(lst_index, until_index):
    for i in range(1, until_index):
        data = countries[i]
        lst_data = data.split(",")
        tmp = ""
        for i in lst_index:
            tmp += lst_data[i]
            tmp += ","
        print(tmp[:-1])


def take_data_count(take_count):
    until_index = 0
    if take_count is not None:
        until_index = int(take_count) + 1
    else:
        until_index = len(countries)
    return until_index


def select_fields_via_index(dic, selected_fields):
    lst_index = []
    for field in selected_fields:
        index = dic[field]
        lst_index.append(index)
    return lst_index


def handle_user_input(string_input):
    lst = string_input.split(" ")
    selected_fields_tmp = lst[3]

    if len(string_input)>4:
        take_count = lst[-1]
        take_keyword = lst[-2]
    selected_fields = selected_fields_tmp.split(",")
    if take_count and take_keyword:
        return selected_fields, take_count
    else:
        return selected_fields

select_take("FROM countries.csv SELECT country,country_name,latitude TAKE 2")

refactored code

class CSVQueryParser:
    """A simple SQL-like query parser for CSV data."""
    
    def __init__(self, data):
        """Initialize with CSV data as list of strings."""
        self.data = data
        self.headers = self._parse_headers()
        self.field_indices = self._create_field_mapping()
    
    def _parse_headers(self):
        """Extract headers from the first row."""
        return self.data[0].split(",")
    
    def _create_field_mapping(self):
        """Create a mapping of field names to their indices."""
        return {header: idx for idx, header in enumerate(self.headers)}
    
    def _parse_query(self, query):
        """Parse the query string and extract components."""
        parts = query.split()
        
        # Find key components
        from_idx = parts.index("FROM") if "FROM" in parts else -1
        select_idx = parts.index("SELECT") if "SELECT" in parts else -1
        take_idx = parts.index("TAKE") if "TAKE" in parts else -1
        
        # Extract table name (not used in this simple implementation)
        table_name = parts[from_idx + 1] if from_idx != -1 and from_idx + 1 < len(parts) else None
        
        # Extract selected fields
        selected_fields = []
        if select_idx != -1 and select_idx + 1 < len(parts):
            fields_str = parts[select_idx + 1]
            selected_fields = fields_str.split(",")
        
        # Extract limit count
        limit_count = None
        if take_idx != -1 and take_idx + 1 < len(parts):
            limit_count = int(parts[take_idx + 1])
        
        return {
            'table': table_name,
            'fields': selected_fields,
            'limit': limit_count
        }
    
    def _get_field_indices(self, field_names):
        """Get indices for the specified field names."""
        indices = []
        for field in field_names:
            if field in self.field_indices:
                indices.append(self.field_indices[field])
            else:
                raise ValueError(f"Field '{field}' not found in data")
        return indices
    
    def _format_row(self, row_data, field_indices):
        """Format a single row based on selected field indices."""
        row_parts = row_data.split(",")
        selected_values = [row_parts[idx] for idx in field_indices]
        return ",".join(selected_values)
    
    def execute_query(self, query):
        """Execute a query and return the results."""
        parsed = self._parse_query(query)
        
        # Determine which fields to select
        if parsed['fields']:
            field_indices = self._get_field_indices(parsed['fields'])
        else:
            field_indices = list(range(len(self.headers)))
        
        # Determine how many rows to return
        data_rows = self.data[1:]  # Skip header row
        if parsed['limit']:
            if parsed['limit'] > len(data_rows):
                raise ValueError("Not enough data for the requested limit")
            data_rows = data_rows[:parsed['limit']]
        
        # Format and return results
        results = []
        for row in data_rows:
            formatted_row = self._format_row(row, field_indices)
            results.append(formatted_row)
        
        return results
    
    def print_query_results(self, query):
        """Execute query and print results to console."""
        try:
            results = self.execute_query(query)
            for result in results:
                print(result)
        except ValueError as e:
            print(f"Error: {e}")


# Example usage
def main():
    # Sample data
    countries_data = [
        "country,latitude,longitude,country_name",
        "AD,42.546245,1.601554,Andorra",
        "AE,23.424076,53.847818,United Arab Emirates",
        "AF,33.93911,67.709953,Afghanistan",
        "AG,17.060816,-61.796428,Antigua and Barbuda"
    ]
    
    # Create parser instance
    parser = CSVQueryParser(countries_data)
    
    # Execute various queries
    print("=== Query 1: SELECT country,country_name,latitude TAKE 2 ===")
    parser.print_query_results("FROM countries.csv SELECT country,country_name,latitude TAKE 2")
    
    print("\n=== Query 2: SELECT country,country_name ===")
    parser.print_query_results("FROM countries.csv SELECT country,country_name")
    
    print("\n=== Query 3: TAKE 2 (all fields) ===")
    parser.print_query_results("FROM countries.csv TAKE 2")


if __name__ == "__main__":
    main()

0개의 댓글