와우 패치를 해야한다.
- 주어진 코드에 존재하는 취약점을 패치해보세요.
- 코드의 기능이 정상 동작하는지 확인하는 SLA도 동작합니다.
- 코드 수정 후 Submit 하시면 테스트가 시작됩니다.
- "✔ 수정 가능"인 코드만 수정 가능합니다.
- 로컬 환경에서 테스트 후 서버의 검증 과정을 받는 것을 추천드립니다.
- 30초에 한번 제출할 수 있습니다.
- 주의 사항
- Python Syntax에 주의하세요.
- socket, execve, ... 등 외부 서버 접속 및 OS 커맨드 사용 불가.
- os.environ 등에 저장된 설정 변수 변경 불가.
- app.run() 임의로 추가 불가.
#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib
app = Flask(__name__)
app.secret_key = "Th1s_1s_V3ry_secret_key"
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(os.environ['DATABASE'])
db.row_factory = sqlite3.Row
return db
def query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route('/')
def index():
return "api-server"
@app.route('/api/me')
def me():
if session.get('uid'):
return jsonify(userid=session['uid'])
return jsonify(userid=None)
@app.route('/api/login', methods=['POST'])
def login():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
if ret:
session['uid'] = ret[0]
return jsonify(result="success", userid=ret[0])
return jsonify(result="fail")
@app.route('/api/logout')
def logout():
session.pop('uid', None)
return jsonify(result="success")
@app.route('/api/join', methods=['POST'])
def join():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
conn = get_db()
cur = conn.cursor()
cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
conn.commit()
return jsonify(result="success")
return jsonify(result="error")
@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
if not session.get('uid'):
return jsonify(result="no login")
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if title and contents:
conn = get_db()
cur = conn.cursor()
ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
conn.commit()
return jsonify(result="success", memoidx=ret.lastrowid)
return jsonify(result="error")
@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
mode = request.args.get('mode', 'json')
ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
if ret:
userid = ret['userid']
title = ret['title']
contents = ret['contents']
if mode == 'html':
template = ''' Written by {userid}<h3>{title}</h3>
<pre>{contents}</pre>
'''.format(title=title, userid=userid, contents=contents)
return render_template_string(template)
else:
return jsonify(result="success",
userid=userid,
title=title,
contents=contents)
return jsonify(result="error")
@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
if not session.get('uid'):
return jsonify(result="no login")
ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if ret and title and contents:
conn = get_db()
cur = conn.cursor()
updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
conn.commit()
if updateRet:
return jsonify(result="success")
return jsonify(result="error")
경험상 flask를 쓰면 ssti가 혹시 존재하는 지 봐야하는데 render_template 이거 그냥 Ctrl+F해서 찾아보면 된다.
그렇다면 정말 터지기 딱 좋은 부분이 존재한다. 이걸 어떻게 패치할 수 있을까? 바로 format을 사용하는 게 아니라 딕셔너리를 사용하여 값을 넘겨주면 된다. (사실 이 내용 어디서 들었는데 그 레퍼런스를 찾을 수가 없다...)
하지만 Unknown Error가 발생한다. 어라 template에 중괄호가 한 개 밖에 없다.
우선 패치한 app.py는 다음과 같다.
#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib
app = Flask(__name__)
app.secret_key = "Th1s_1s_V3ry_secret_key"
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(os.environ['DATABASE'])
db.row_factory = sqlite3.Row
return db
def query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route('/')
def index():
return "api-server"
@app.route('/api/me')
def me():
if session.get('uid'):
return jsonify(userid=session['uid'])
return jsonify(userid=None)
@app.route('/api/login', methods=['POST'])
def login():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
if ret:
session['uid'] = ret[0]
return jsonify(result="success", userid=ret[0])
return jsonify(result="fail")
@app.route('/api/logout')
def logout():
session.pop('uid', None)
return jsonify(result="success")
@app.route('/api/join', methods=['POST'])
def join():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
conn = get_db()
cur = conn.cursor()
cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
conn.commit()
return jsonify(result="success")
return jsonify(result="error")
@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
if not session.get('uid'):
return jsonify(result="no login")
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if title and contents:
conn = get_db()
cur = conn.cursor()
ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
conn.commit()
return jsonify(result="success", memoidx=ret.lastrowid)
return jsonify(result="error")
@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
mode = request.args.get('mode', 'json')
ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
if ret:
userid = ret['userid']
title = ret['title']
contents = ret['contents']
if mode == 'html':
template = ''' Written by {{userid}}<h3>{{title}}</h3>
<pre>{{contents}}</pre>
'''
dict_param = {
'userid':userid,
'title':title,
'contents':contents
}
return render_template_string(template, **dict_param)
else:
return jsonify(result="success",
userid=userid,
title=title,
contents=contents)
return jsonify(result="error")
@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
if not session.get('uid'):
return jsonify(result="no login")
ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if ret and title and contents:
conn = get_db()
cur = conn.cursor()
updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
conn.commit()
if updateRet:
return jsonify(result="success")
return jsonify(result="error")

Memo Update IDOR은 뭘까?
사실 IDOR의 개념을 처음 들어보았다.
https://www.hahwul.com/cullinan/idor/
그러니까 음 우리가 받은 idx가 조작되었는지 확인을 해주어야 한다.

이제 위의 두 취약점만 남았다.
잉 Hard-coded key는 os.urandom(32)로 바꾸어주면 될 것 같다.
#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib
app = Flask(__name__)
app.secret_key = os.urandom(32)
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(os.environ['DATABASE'])
db.row_factory = sqlite3.Row
return db
def query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route('/')
def index():
return "api-server"
@app.route('/api/me')
def me():
if session.get('uid'):
return jsonify(userid=session['uid'])
return jsonify(userid=None)
@app.route('/api/login', methods=['POST'])
def login():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
ret = query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
if ret:
session['uid'] = ret[0]
return jsonify(result="success", userid=ret[0])
return jsonify(result="fail")
@app.route('/api/logout')
def logout():
session.pop('uid', None)
return jsonify(result="success")
@app.route('/api/join', methods=['POST'])
def join():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
conn = get_db()
cur = conn.cursor()
cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
conn.commit()
return jsonify(result="success")
return jsonify(result="error")
@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
if not session.get('uid'):
return jsonify(result="no login")
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if title and contents:
conn = get_db()
cur = conn.cursor()
ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
conn.commit()
return jsonify(result="success", memoidx=ret.lastrowid)
return jsonify(result="error")
@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
mode = request.args.get('mode', 'json')
ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
if ret:
userid = ret['userid']
title = ret['title']
contents = ret['contents']
if mode == 'html':
template = ''' Written by {{userid}}<h3>{{title}}</h3>
<pre>{{contents}}</pre>
'''
dict_param = {
'userid':userid,
'title':title,
'contents':contents
}
return render_template_string(template, **dict_param)
else:
return jsonify(result="success",
userid=userid,
title=title,
contents=contents)
return jsonify(result="error")
@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
if not session.get('uid'):
return jsonify(result="no login")
ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if userid==ret['userid']:
if ret and title and contents:
conn = get_db()
cur = conn.cursor()
updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
conn.commit()
if updateRet:
return jsonify(result="success")
return jsonify(result="error")
sql injection 취약점은 어떻게 막아야 하지 ...?
근데 쿼리를 잘 보면 대부분 '?' 을 사용하여 포맷팅을 하는데
query_db(f"SELECT * FROM users where userid='{userid}' and password='{hashlib.sha256(password.encode()).hexdigest()}'" , one=True)
여기만 눈에 잘 띈다. 검색을 해보니 ?를 통해 받아야 안전하다고 한다.
따라서 해당 부분을
query_db("SELECT * FROM users where userid = ? and password = ?" , (userid, password), one=True)
이렇게 바꾸었따.
근데 지금까지 잘 패치된 내용들도 에러가 났고, SLA도 실패했다. 그런데 SqlI는 또 막았다 ...?
어 그런데 인코딩을 안해주었다. 아ㅏㅏㅏ 이 부분을 틀렸구나
#!/usr/bin/python3
from flask import Flask, request, render_template_string, g, session, jsonify
import sqlite3
import os, hashlib
app = Flask(__name__)
app.secret_key = os.urandom(32)
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(os.environ['DATABASE'])
db.row_factory = sqlite3.Row
return db
def query_db(query, args=(), one=False):
cur = get_db().execute(query, args)
rv = cur.fetchall()
cur.close()
return (rv[0] if rv else None) if one else rv
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@app.route('/')
def index():
return "api-server"
@app.route('/api/me')
def me():
if session.get('uid'):
return jsonify(userid=session['uid'])
return jsonify(userid=None)
@app.route('/api/login', methods=['POST'])
def login():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
ret = query_db("SELECT * FROM users where userid = ? and password = ?" , (userid, hashlib.sha256(password.encode()).hexdigest()), one=True)
if ret:
session['uid'] = ret[0]
return jsonify(result="success", userid=ret[0])
return jsonify(result="fail")
@app.route('/api/logout')
def logout():
session.pop('uid', None)
return jsonify(result="success")
@app.route('/api/join', methods=['POST'])
def join():
userid = request.form.get('userid', '')
password = request.form.get('password', '')
if userid and password:
conn = get_db()
cur = conn.cursor()
cur.execute("Insert into users values(?, ?);", (userid, hashlib.sha256(password.encode()).hexdigest()))
conn.commit()
return jsonify(result="success")
return jsonify(result="error")
@app.route('/api/memo/add', methods=['PUT'])
def memoAdd():
if not session.get('uid'):
return jsonify(result="no login")
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if title and contents:
conn = get_db()
cur = conn.cursor()
ret = cur.execute("Insert into memo(userid, title, contents) values(?, ?, ?);", (userid, title, contents))
conn.commit()
return jsonify(result="success", memoidx=ret.lastrowid)
return jsonify(result="error")
@app.route('/api/memo/<idx>', methods=['GET'])
def memoView(idx):
mode = request.args.get('mode', 'json')
ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
if ret:
userid = ret['userid']
title = ret['title']
contents = ret['contents']
if mode == 'html':
template = ''' Written by {{userid}}<h3>{{title}}</h3>
<pre>{{contents}}</pre>
'''
dict_param = {
'userid':userid,
'title':title,
'contents':contents
}
return render_template_string(template, **dict_param)
else:
return jsonify(result="success",
userid=userid,
title=title,
contents=contents)
return jsonify(result="error")
@app.route('/api/memo/<int:idx>', methods=['PUT'])
def memoUpdate(idx):
if not session.get('uid'):
return jsonify(result="no login")
ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
userid = session.get('uid')
title = request.form.get('title')
contents = request.form.get('contents')
if userid==ret['userid']:
if ret and title and contents:
conn = get_db()
cur = conn.cursor()
updateRet = cur.execute("UPDATE memo SET title=?, contents=? WHERE idx=?",(title, contents, idx))
conn.commit()
if updateRet:
return jsonify(result="success")
return jsonify(result="error")
어디서 SqlI가 계속 터지는 거지 ?
아놔
ret = query_db("SELECT * FROM memo where idx=" + idx)[0]
이 부분이 계속 있었다.
ret = query_db('SELECT * FROM memo where idx=?', [idx,])[0]
이렇게 바꾸어주자...
