[Dreamhack] - PATCH1

김성진·2022년 12월 31일

📒 Description

와우 패치를 해야한다.


📒 Exploit

📖 Usage

- 주어진 코드에 존재하는 취약점을 패치해보세요.
- 코드의 기능이 정상 동작하는지 확인하는 SLA도 동작합니다. 
- 코드 수정 후 Submit 하시면 테스트가 시작됩니다.
- "✔ 수정 가능"인 코드만 수정 가능합니다.
- 로컬 환경에서 테스트 후 서버의 검증 과정을 받는 것을 추천드립니다.
- 30초에 한번 제출할 수 있습니다.

- 주의 사항
  - Python Syntax에 주의하세요.
  - socket, execve, ... 등 외부 서버 접속 및 OS 커맨드 사용 불가. 
  - os.environ 등에 저장된 설정 변수 변경 불가.
  - app.run() 임의로 추가 불가.

📖 app.py

#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib

app = Flask(__name__)
app.secret_key = "Th1s_1s_V3ry_secret_key"

def get_db():
  db = getattr(g, '_database', None)
  if db is None:
    db = g._database = sqlite3.connect(os.environ['DATABASE'])
  db.row_factory = sqlite3.Row
  return db

def query_db(query, args=(), one=False):
  cur = get_db().execute(query, args)
  rv = cur.fetchall()
  cur.close()
  return (rv[0] if rv else None) if one else rv

@app.teardown_appcontext
def close_connection(exception):
  db = getattr(g, '_database', None)
  if db is not None:
    db.close()

@app.route('/')
def index():
  return "api-server"

@app.route('/api/me')
def me():
  if session.get('uid'):
    return jsonify(userid=session['uid'])
  return jsonify(userid=None)

@app.route('/api/login', methods=['POST'])
def login():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
    if ret:
      session['uid'] = ret[0]
      return jsonify(result="success", userid=ret[0])
  return jsonify(result="fail")

@app.route('/api/logout')
def logout():
  session.pop('uid', None)
  return jsonify(result="success")

@app.route('/api/join', methods=['POST'])
def join():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    conn = get_db()
    cur = conn.cursor()
    cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
    conn.commit()
    return jsonify(result="success")
  return jsonify(result="error")

@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
  if not session.get('uid'):
    return jsonify(result="no login")

  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if title and contents:
    conn = get_db()
    cur = conn.cursor()
    ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
    conn.commit()
    return jsonify(result="success", memoidx=ret.lastrowid)
  return jsonify(result="error")

@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
  mode = request.args.get('mode', 'json')
  ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
  if ret:
    userid = ret['userid']
    title = ret['title']
    contents = ret['contents']
    if mode == 'html':
      template = ''' Written by {userid}<h3>{title}</h3>
      <pre>{contents}</pre>
      '''.format(title=title, userid=userid, contents=contents)
      return render_template_string(template)
    else:
      return jsonify(result="success",
        userid=userid,
        title=title,
        contents=contents)
  return jsonify(result="error")

@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
  if not session.get('uid'):
    return jsonify(result="no login")

  ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if ret and title and contents:
    conn = get_db()
    cur = conn.cursor()
    updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
    conn.commit()
    if updateRet:
      return jsonify(result="success")
  return jsonify(result="error")

경험상 flask를 쓰면 ssti가 혹시 존재하는 지 봐야하는데 render_template 이거 그냥 Ctrl+F해서 찾아보면 된다. 그렇다면 정말 터지기 딱 좋은 부분이 존재한다. 이걸 어떻게 패치할 수 있을까? 바로 format을 사용하는 게 아니라 딕셔너리를 사용하여 값을 넘겨주면 된다. (사실 이 내용 어디서 들었는데 그 레퍼런스를 찾을 수가 없다...)

하지만 Unknown Error가 발생한다. 어라 template에 중괄호가 한 개 밖에 없다.

우선 패치한 app.py는 다음과 같다.

#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib

app = Flask(__name__)
app.secret_key = "Th1s_1s_V3ry_secret_key"

def get_db():
  db = getattr(g, '_database', None)
  if db is None:
    db = g._database = sqlite3.connect(os.environ['DATABASE'])
  db.row_factory = sqlite3.Row
  return db

def query_db(query, args=(), one=False):
  cur = get_db().execute(query, args)
  rv = cur.fetchall()
  cur.close()
  return (rv[0] if rv else None) if one else rv

@app.teardown_appcontext
def close_connection(exception):
  db = getattr(g, '_database', None)
  if db is not None:
    db.close()

@app.route('/')
def index():
  return "api-server"

@app.route('/api/me')
def me():
  if session.get('uid'):
    return jsonify(userid=session['uid'])
  return jsonify(userid=None)

@app.route('/api/login', methods=['POST'])
def login():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
    if ret:
      session['uid'] = ret[0]
      return jsonify(result="success", userid=ret[0])
  return jsonify(result="fail")

@app.route('/api/logout')
def logout():
  session.pop('uid', None)
  return jsonify(result="success")

@app.route('/api/join', methods=['POST'])
def join():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    conn = get_db()
    cur = conn.cursor()
    cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
    conn.commit()
    return jsonify(result="success")
  return jsonify(result="error")

@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
  if not session.get('uid'):
    return jsonify(result="no login")

  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if title and contents:
    conn = get_db()
    cur = conn.cursor()
    ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
    conn.commit()
    return jsonify(result="success", memoidx=ret.lastrowid)
  return jsonify(result="error")

@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
  mode = request.args.get('mode', 'json')
  ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
  if ret:
    userid = ret['userid']
    title = ret['title']
    contents = ret['contents']
    if mode == 'html':
      template = ''' Written by {{userid}}<h3>{{title}}</h3>
      <pre>{{contents}}</pre>
      '''
      dict_param = {
        'userid':userid,
        'title':title,
        'contents':contents
      }
      return render_template_string(template, **dict_param)
    else:
      return jsonify(result="success",
        userid=userid,
        title=title,
        contents=contents)
  return jsonify(result="error")

@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
  if not session.get('uid'):
    return jsonify(result="no login")

  ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if ret and title and contents:
    conn = get_db()
    cur = conn.cursor()
    updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
    conn.commit()
    if updateRet:
      return jsonify(result="success")
  return jsonify(result="error")

Memo Update IDOR은 뭘까?
사실 IDOR의 개념을 처음 들어보았다.
https://www.hahwul.com/cullinan/idor/

그러니까 음 우리가 받은 idx가 조작되었는지 확인을 해주어야 한다.
이제 위의 두 취약점만 남았다.
잉 Hard-coded key는 os.urandom(32)로 바꾸어주면 될 것 같다.

#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib

app = Flask(__name__)
app.secret_key = os.urandom(32)

def get_db():
  db = getattr(g, '_database', None)
  if db is None:
    db = g._database = sqlite3.connect(os.environ['DATABASE'])
  db.row_factory = sqlite3.Row
  return db

def query_db(query, args=(), one=False):
  cur = get_db().execute(query, args)
  rv = cur.fetchall()
  cur.close()
  return (rv[0] if rv else None) if one else rv

@app.teardown_appcontext
def close_connection(exception):
  db = getattr(g, '_database', None)
  if db is not None:
    db.close()

@app.route('/')
def index():
  return "api-server"

@app.route('/api/me')
def me():
  if session.get('uid'):
    return jsonify(userid=session['uid'])
  return jsonify(userid=None)

@app.route('/api/login', methods=['POST'])
def login():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
    if ret:
      session['uid'] = ret[0]
      return jsonify(result="success", userid=ret[0])
  return jsonify(result="fail")

@app.route('/api/logout')
def logout():
  session.pop('uid', None)
  return jsonify(result="success")

@app.route('/api/join', methods=['POST'])
def join():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    conn = get_db()
    cur = conn.cursor()
    cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
    conn.commit()
    return jsonify(result="success")
  return jsonify(result="error")

@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
  if not session.get('uid'):
    return jsonify(result="no login")

  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if title and contents:
    conn = get_db()
    cur = conn.cursor()
    ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
    conn.commit()
    return jsonify(result="success", memoidx=ret.lastrowid)
  return jsonify(result="error")

@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
  mode = request.args.get('mode', 'json')
  ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
  if ret:
    userid = ret['userid']
    title = ret['title']
    contents = ret['contents']
    if mode == 'html':
      template = ''' Written by {{userid}}<h3>{{title}}</h3>
      <pre>{{contents}}</pre>
      '''
      dict_param = {
        'userid':userid,
        'title':title,
        'contents':contents
      }
      return render_template_string(template, **dict_param)
    else:
      return jsonify(result="success",
        userid=userid,
        title=title,
        contents=contents)
  return jsonify(result="error")

@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
  if not session.get('uid'):
    return jsonify(result="no login")

  ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')
  if userid==ret['userid']:
    if ret and title and contents:
      conn = get_db()
      cur = conn.cursor()
      updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
      conn.commit()
      if updateRet:
        return jsonify(result="success")
  return jsonify(result="error")

sql injection 취약점은 어떻게 막아야 하지 ...?
근데 쿼리를 잘 보면 대부분 '?' 을 사용하여 포맷팅을 하는데

query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)

여기만 눈에 잘 띈다. 검색을 해보니 ?를 통해 받아야 안전하다고 한다.
따라서 해당 부분을

query_db("SELECT * FROM users where userid = ? and password = ?" , (userid, password), one=True)

이렇게 바꾸었따.근데 지금까지 잘 패치된 내용들도 에러가 났고, SLA도 실패했다. 그런데 SqlI는 또 막았다 ...?

어 그런데 인코딩을 안해주었다. 아ㅏㅏㅏ 이 부분을 틀렸구나

#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib

app = Flask(__name__)
app.secret_key = os.urandom(32)

def get_db():
  db = getattr(g, '_database', None)
  if db is None:
    db = g._database = sqlite3.connect(os.environ['DATABASE'])
  db.row_factory = sqlite3.Row
  return db

def query_db(query, args=(), one=False):
  cur = get_db().execute(query, args)
  rv = cur.fetchall()
  cur.close()
  return (rv[0] if rv else None) if one else rv

@app.teardown_appcontext
def close_connection(exception):
  db = getattr(g, '_database', None)
  if db is not None:
    db.close()

@app.route('/')
def index():
  return "api-server"

@app.route('/api/me')
def me():
  if session.get('uid'):
    return jsonify(userid=session['uid'])
  return jsonify(userid=None)

@app.route('/api/login', methods=['POST'])
def login():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    ret = query_db("SELECT * FROM users where userid = ? and password = ?" , (userid, hashlib.sha256(password.encode()).hexdigest()), one=True)
    if ret:
      session['uid'] = ret[0]
      return jsonify(result="success", userid=ret[0])
  return jsonify(result="fail")

@app.route('/api/logout')
def logout():
  session.pop('uid', None)
  return jsonify(result="success")

@app.route('/api/join', methods=['POST'])
def join():
  userid = request.form.get('userid', '')
  password = request.form.get('password', '')
  if userid and password:
    conn = get_db()
    cur = conn.cursor()
    cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
    conn.commit()
    return jsonify(result="success")
  return jsonify(result="error")

@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
  if not session.get('uid'):
    return jsonify(result="no login")

  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')

  if title and contents:
    conn = get_db()
    cur = conn.cursor()
    ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
    conn.commit()
    return jsonify(result="success", memoidx=ret.lastrowid)
  return jsonify(result="error")

@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
  mode = request.args.get('mode', 'json')
  ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
  if ret:
    userid = ret['userid']
    title = ret['title']
    contents = ret['contents']
    if mode == 'html':
      template = ''' Written by {{userid}}<h3>{{title}}</h3>
      <pre>{{contents}}</pre>
      '''
      dict_param = {
        'userid':userid,
        'title':title,
        'contents':contents
      }
      return render_template_string(template, **dict_param)
    else:
      return jsonify(result="success",
        userid=userid,
        title=title,
        contents=contents)
  return jsonify(result="error")

@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
  if not session.get('uid'):
    return jsonify(result="no login")

  ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
  userid = session.get('uid')
  title = request.form.get('title')
  contents = request.form.get('contents')
  if userid==ret['userid']:
    if ret and title and contents:
      conn = get_db()
      cur = conn.cursor()
      updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
      conn.commit()
      if updateRet:
        return jsonify(result="success")
  return jsonify(result="error")

어디서 SqlI가 계속 터지는 거지 ?
아놔

ret = query_db("SELECT * FROM memo where idx=" + idx)[0]

이 부분이 계속 있었다.

ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]

이렇게 바꾸어주자...


📒 Success

profile
Today I Learned

0개의 댓글