nestjs CSV postgres

욱2·2023년 5월 29일
0

NodeJs

목록 보기
13/13

복붙과 이런저런 실험과 chatGpt와 끊임없는 토론을 통해 성공? 98% 성공 시켰습니다.

  1. 파일을 불러와서 서비스로 보낸다.
    @Post('/process')
    async processCSV(): Promise<void> {

        const inputFile = path.resolve('src/stores/csv/222.csv');

        await this.storesService.processCSVFile(inputFile);
    }
  1. 서비스
    사용했던 부분:
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { StoresRepository } from './stores.repository';
import { Stores } from './stores.entity';
import { StoresSearchDto } from './dto/stores.dto';
import { CreateStoresDto } from './dto/createStores.dto';
import axios from 'axios';
import * as csvParser from 'csv-parser';
import { Pool } from 'pg';
import { QueryResult } from 'pg';
import { createReadStream } from 'fs';



@Injectable()
export class StoresService {
    private pool: Pool;
        constructor(
        @InjectRepository(StoresRepository)
        private storesRepository: StoresRepository,
     
        ) {
            this.pool = new Pool({
                host: 'localhost',
                port: 5432,
                user: 'postgres',
                password: '8785',
                database: '22-project',
            });
}

2-1) 저장1차 시도

async processCSVFile(inputFile: string): Promise<void> {
    return new Promise<void>((resolve, reject) => {
        createReadStream(inputFile, { encoding: 'utf-8' })
            .pipe(csvParser())
            .on('error', (error) => {
                console.error('Error reading CSV file:', error);
                reject(error);
            })
            .on('data', async (row: any) => {
                              
                if (row['상세영업상태코드'] === '01') {
                    const check = row['상세영업상태코드'];
                    const La = 11;
                    const Ma = 11;
                    const description = 'string';
                    const maxWaitingCnt = 3;
                    const currentWaitingCnt = 3;
                    const tableForTwo = Math.floor(Math.random() * 10);
                    const tableForFour = Math.floor(Math.random() * 10);
                    const storeName = row['사업장명'];
                    const category = row['위생업태명'];
                    const address = row['도로명전체주소'];
                    console.log(storeName)
                    console.log(check)
                    const query =
                        'INSERT INTO stores (storeName, description, "maxWaitingCnt", "currentWaitingCnt", "La", "Ma", "tableForFour", "tableForTwo", category, address) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)';
                    const values = [
                        storeName,
                        description,
                        maxWaitingCnt,
                        currentWaitingCnt,
                        La,
                        Ma,
                        tableForFour,
                        tableForTwo,
                        category,
                        address
                    ];

                    try {
                        const result: QueryResult = await this.pool.query(query, values);
                        console.log('Inserted', result.rowCount, 'row:', values);
                    } catch (error) {
                        console.error('Error occurred during insert:', error);
                    }
                }
            })
            .on('end', () => {
                resolve();
            })
            .on('finish', () => {
                console.log('CSV processing completed.');
            });
    });
}

실패 했던 이유: 대소문자. 아주 간단한 문제였다. storeName에서 N이 대문자라서 ""안에 넣어줘야했다. description은 모두 소문자라 필요없었던 것이다.

'INSERT INTO stores (storeName, description, "maxWaitingCnt", "currentWaitingCnt", "La", "Ma", "tableForFour", "tableForTwo", category, address) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)'

2-2) 저장 2차시도
위를 수정해서 데이터를 넣어봤다. 문제는 바로 순서이다.

 .on('data', async (row: any)

스트림은 이벤트 기반으로 동작하며 데이터('data') 이벤트는 각 행마다 비동기적으로 발생합니다. 이전 행의 처리가 완료되기를 기다리지 않고 다음 행으로 넘어가기 때문에 순서가 뒤섞이고 행이 건너뛰어질 수 있습니다.

처리가 순서대로 진행되도록 보장하려면 코드를 수정하여 데이터 이벤트에 의존하는 대신 for...of 루프와 []배열을 사용하면 된다고 한다.

2-3) 저장 3차시도
상태가 너무 더러워서 repo로 몇가지 옮겼다

async processCSVFile(inputFile: string): Promise<void> {
  return new Promise<void>((resolve, reject) => {
    const rows: any[] = [];

    createReadStream(inputFile, { encoding: 'utf-8' })
      .pipe(csvParser())
      .on('error', (error) => {
        console.error('Error reading CSV file:', error);
        reject(error);
      })
      .on('data', (row: any) => {
        rows.push(row);
      })
      .on('end', async () => {
        for (const row of rows) {
          await this.storesRepository.processCSVFile(row);
        }
        resolve();
      })
      .on('finish', () => {
        console.log('CSV processing completed.');
      });
  });
}

=========================================================================
repository
 async processCSVFile(row: any): Promise<void> {
    if (row['상세영업상태코드'] === '01') {
      const La = 11;
      const Ma = 11;
      const description = 'string';
      const maxWaitingCnt = 3;
      const currentWaitingCnt = 3;
      const tableForTwo = Math.floor(Math.random() * 10);
      const tableForFour = Math.floor(Math.random() * 10);
      const storeName = row['사업장명'];
      const category = row['위생업태명'];
      const address = row['도로명전체주소'];

      const store = this.storesRepository.create({
        storeName,
        description,
        maxWaitingCnt,
        currentWaitingCnt,
        La,
        Ma,
        tableForFour,
        tableForTwo,
        category,
        address,
      });

      try {
        await this.storesRepository.save(store);
        console.log('Inserted row:', store);
      } catch (error) {
        console.error('Error occurred during insert:', error);
      }
    }
  }

말 루프와 배열을 사용해서 그대로 실행해봤다. 그런데 csv파일이 너무 커서 중간에

FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out of memory
 1: 00007FF752E3027F node_api_throw_syntax_error+175631
 2: 00007FF752DB5B66 v8::internal::wasm::WasmCode::safepoint_table_offset+63110
 3: 00007FF752DB6F22 v8::internal::wasm::WasmCode::safepoint_table_offset+68162
 4: 00007FF7538514C4 v8::Isolate::ReportExternalAllocationLimitReached+116
 5: 00007FF75383C822 v8::Isolate::Exit+674
 6: 00007FF7536BE66C v8::internal::EmbedderStackStateScope::ExplicitScopeForTesting+124
 7: 00007FF7536BB88B v8::internal::Heap::CollectGarbage+3963
 8: 00007FF7536D1AC3 v8::internal::HeapAllocator::AllocateRawWithLightRetrySlowPath+2099
 9: 00007FF7536D236D v8::internal::HeapAllocator::AllocateRawWithRetryOrFailSlowPath+93
10: 00007FF7536DACBA v8::internal::Factory::AllocateRaw+810
11: 00007FF7536F15DA v8::internal::FactoryBase<v8::internal::Factory>::NewFixedArrayWithFiller+90
12: 00007FF7536F18C3 v8::internal::FactoryBase<v8::internal::Factory>::NewFixedArrayWithMap+35
13: 00007FF7534B8EC6 v8::internal::HashTable<v8::internal::NameDictionary,v8::internal::NameDictionaryShape>::EnsureCapacity<v8::internal::Isolate>+246
14: 00007FF7534BFEBD v8::internal::BaseNameDictionary<v8::internal::NameDictionary,v8::internal::NameDictionaryShape>::Add+109
15: 00007FF7533C93D6 v8::internal::Runtime::GetObjectProperty+982
16: 00007FF7538EEB61 v8::internal::SetupIsolateDelegate::SetupHeap+558193
17: 00007FF6D3BB8A39 

요런 애러가 떳다....후..
그래서 나눠서 줘보자로

2-4) 저장 4차시도

import { createReadStream } from 'fs';
import csvParser from 'csv-parser';

async processCSVFile(inputFile: string): Promise<void> {
  const batchSize = 100;
  let currentBatch: any[] = [];

  return new Promise<void>((resolve, reject) => {
    createReadStream(inputFile, { encoding: 'utf-8' })
      .pipe(csvParser())
      .on('error', (error) => {
        console.error('Error reading CSV file:', error);
        reject(error);
      })
      .on('data', (row: any) => {
        if (row['상세영업상태코드'] === '01') {
          currentBatch.push(row);

          if (currentBatch.length >= batchSize) {
            this.processBatch(currentBatch)
              .then(() => {
                currentBatch = [];
              })
              .catch((error) => {
                reject(error);
              });
          }
        }
      })
      .on('end', async () => {
        if (currentBatch.length > 0) {
          await this.processBatch(currentBatch);
        }
        console.log('CSV processing completed.');
        resolve();
      });
  });
}

async processBatch(batch: any[]): Promise<void> {
  for (const row of batch) {
    await this.storesRepository.processCSVFile(row);
  }
}

이번에는 Batch를 넣어줬다. repo는 그대로였다. 100개 씩 넣어주면 100개씩 잘 들어가겠지 했는데 여기서 다음 데이터로 안넘어가는 것이였다.
매우 간단한 문제였다.
100개를 돌려서 넣어야했는데 그부분을 깜빡한것이다. 매우 간단한 곳에서 헤맸었다.

2-5) 성공
repository 수정

async processBatch(rows: any[]): Promise<void> {
  // Process the batch of rows
  for (const rowData of rows) {
    // Process each row as needed
    const La = 11;
    const Ma = 11;
    const description = 'string';
    const maxWaitingCnt = 3;
    const currentWaitingCnt = 3;
    const tableForTwo = Math.floor(Math.random() * 10);
    const tableForFour = Math.floor(Math.random() * 10);
    const storeName = rowData['사업장명'];
    const category = rowData['위생업태명'];
    const address = rowData['도로명전체주소'];

    const store = this.create({
      storeName,
      description,
      maxWaitingCnt,
      currentWaitingCnt,
      La,
      Ma,
      tableForFour,
      tableForTwo,
      category,
      address,
    });

    try {
      const result = await this.save(store);
      console.log('Inserted', result, 'row:', store);
    } catch (error) {
      console.error('Error occurred during insert:', error);
    }
  }
}

이제 순서대로 들어간다.


쿼리 부분을 봐도 typeorm으로 수정하면서 많이 깔끔해지고, 가독성, 유지보수에 좋아진 점을 볼수있다.

서비스와 리포시토리를 조금 더 이쁘게 나눌 생각이다.

import { TypeOrmModuleOptions } from '@nestjs/typeorm';
import { Reviews } from 'src/reviews/reviews.entity';
import { Stores } from 'src/stores/stores.entity';
import { Tables } from 'src/tables/tables.entity';
import { Users } from 'src/users/users.entity';
import { Waitings } from 'src/waitings/waitings.entity';

export const typeORMConfig: TypeOrmModuleOptions = {
  type: 'postgres',
  host: 'localhost',
  port: 5432,
  username: 'postgres',
  password: '****',
  database: '22-project',
  entities: [Stores, Users, Reviews, Waitings, Tables],
  synchronize: true
}

config파일을 사용중이니 pool 부분과 pg부분도 수정해서 올릴예정이다.
겨우 데이터 하나 넣는데 이렇게 시간이 많이 투자 될지 몰랐다. 굉장히 쉬운 방법인데 여러 과정을 수정하고 새롭게 코드들을 뒤집고 이런저런 라이브러리도 사용했다가 지우곤했다.

그리고 리퍼런스를 찾다가 csv 헤더를 "" 하고 , 같이 나누는 부분들이 있는데, ""안에 ""가 있으면 에러가 떠서 이부분에서 많은 삽질을 했다.

예) "충청북도 제천시 청풍면 물태리 290-6번지 "나"동","충청북도 ...."

물론 csvParser()로 간단히 해결된 문제이기도 하다
정리된 서비스 리포:
https://velog.io/@saro3/%EC%A0%95%EB%A6%AC%EB%90%9C-%ED%8C%8C%EC%9D%BC

profile
성장하는 날 위한 기록

0개의 댓글