Add sort and filter (aggregation)

Co-authored-by: Ezequiel Bellver <ebellver@itba.edu.ar>
Co-authored-by: Juan Barmasch <jbarmasch@itba.edu.ar>
This commit is contained in:
Santiago Lo Coco 2022-12-12 23:42:22 -03:00
parent 333e7fd900
commit 9ffd45dfb6
3 changed files with 223 additions and 207 deletions

View File

@ -2,6 +2,7 @@ from fastapi import APIRouter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from bsition.api.models.table import Filter, Sort, Table from bsition.api.models.table import Filter, Sort, Table
from bsition.backend.mongo import tables as mongo
from bsition.backend.postgres import tables as postgres from bsition.backend.postgres import tables as postgres
router = APIRouter() router = APIRouter()
@ -9,24 +10,24 @@ router = APIRouter()
@router.post("") @router.post("")
def create_table(aux: Table): def create_table(aux: Table):
postgres.create_table(aux.name) mongo.create_table(aux.name)
return JSONResponse(content={"detail": "Table created."}, status_code=201) return JSONResponse(content={"detail": "Table created."}, status_code=201)
@router.put("/{name}") @router.put("/{name}")
def edit_table(aux: Table, name: str): def edit_table(aux: Table, name: str):
if aux.column is not None and aux.type is not None: if aux.column is not None and aux.type is not None:
postgres.add_column(name, aux.column, aux.type) mongo.add_column(name, aux.column, aux.type)
if aux.column_data is not None: if aux.column_data is not None:
postgres.insert_columns(name, aux.column_data) mongo.insert_columns(name, aux.column_data)
if aux.row_number is not None: if aux.row_number is not None:
postgres.edit_columns(name, aux.columns, aux.columns_data, aux.row_number) mongo.edit_columns(name, aux.columns, aux.columns_data, aux.row_number)
return JSONResponse(content={"detail": "Table updated."}, status_code=202) return JSONResponse(content={"detail": "Table updated."}, status_code=202)
@router.post("/{name}/sort") @router.post("/{name}/sort")
def create_sort(name: str): def create_sort(name: str):
postgres.create_sort(name) postgres.create_sort()
return JSONResponse(content={"detail": "Sort created."}, status_code=201) return JSONResponse(content={"detail": "Sort created."}, status_code=201)
@ -38,13 +39,13 @@ def add_sort(aux: Sort, name: str):
@router.get("/{name}/sort") @router.get("/{name}/sort")
def sort(name: str): def sort(name: str):
return postgres.sort(name) return mongo.sort(name)
@router.post("/{name}/filter") @router.post("/{name}/filter")
def create_filter(name: str): def create_filter(name: str):
postgres.create_filter(name) postgres.create_filter()
postgres.add_filter_trigger(name) # postgres.add_filter_trigger(name)
return JSONResponse(content={"detail": "Filter created."}, status_code=201) return JSONResponse(content={"detail": "Filter created."}, status_code=201)
@ -56,4 +57,4 @@ def add_filter(aux: Filter, name: str):
@router.get("/{name}/filter") @router.get("/{name}/filter")
def filter(name: str): def filter(name: str):
return postgres.filter(name) return mongo.filter(name)

View File

@ -0,0 +1,126 @@
from bsition.backend.mongo.utils import get_database
from bsition.backend.postgres import tables as postgres
def create_table(table):
dbname = get_database()
docs_coll = dbname["tables"]
docs_coll.insert_one(table)
def insert_columns(id, name, type, data):
dbname = get_database()
docs_coll = dbname["tables"]
doc = docs_coll.find_one({"_id": id}, {"_id": 0})
new_column_names = doc["column_names"].copy()
new_column_names.append(name)
new_types = doc["types"].copy()
new_types.append(type)
docs_coll.update_one({"_id": id}, {"$set": {
"column_names": new_column_names, "types": new_types}
})
def edit_column(row_number, column, data, id):
dbname = get_database()
docs_coll = dbname["tables"]
doc = docs_coll.find_one({"_id": id}, {"_id": 0})
column_names = doc["column_names"]
new_data = doc["data"].copy()
i = column_names.index(column)
new_data[row_number]["column_data"][i] = data
docs_coll.update_one({"_id": id}, {"$set": {
"data": new_data}
})
def sort(id):
dbname = get_database()
docs_coll = dbname["tables"]
doc = docs_coll.find_one({"_id": id}, {"_id": 0})
column_names = doc["column_names"]
sort_values = postgres.get_sort(id)
obj = {}
for sort in sort_values:
obj['data.column_data.' + str(column_names.index(sort[1]))] = 1 if sort[2] == 'ASC' else -1
pipeline = [
{"$match": {"_id": id}},
{"$unwind": '$data'},
{"$sort": obj},
{"$group": {"_id": '$_id', 'aux': {"$push": '$data'}}},
{"$project": {'data': '$aux'}}
]
return list(docs_coll.aggregate(pipeline))
def filter(id):
dbname = get_database()
docs_coll = dbname["tables"]
doc = docs_coll.find_one({"_id": id}, {"_id": 0})
column_names = doc["column_names"]
filter_values = postgres.get_filter(id)
obj = {}
for filter in filter_values:
match filter[3]:
case "e":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$eq": filter[2]
}
case "ne":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$ne": filter[2]
}
case "le":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$lte": filter[2]
}
case "ge":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$gte": filter[2]
}
case "l":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$lt": filter[2]
}
case "g":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$gt": filter[2]
}
case "c":
obj['data.column_data.' + str(column_names.index(filter[1]))] = {
"$regex": ".*" + filter[2] + ".*", "$options": "i"
}
case "_":
raise "Invalid filter function"
pipeline = [
{"$match": {"_id": id}},
{"$unwind": '$data'},
{"$match": obj},
{"$group": {"_id": '$_id', 'aux': {"$push": '$data'}}},
{"$project": {'data': '$aux'}}
]
return list(docs_coll.aggregate(pipeline))
def remove_column(id, column):
dbname = get_database()
docs_coll = dbname["tables"]
doc = docs_coll.find_one({"_id": id}, {"_id": 0})
column_names = doc["column_names"]
types = doc["types"]
idx = column_names.index(column)
del column_names[idx]
del types[idx]
data = doc["data"]
for row in data:
del row["column_data"][idx]
docs_coll.update_one({"_id": id}, {"$set": {
"column_names": column_names, "types": types, "data": data}
})

View File

@ -3,223 +3,112 @@ from psycopg2 import sql
from bsition.backend.postgres.utils import get_connection from bsition.backend.postgres.utils import get_connection
def create_table(name): def create_sort():
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("CREATE TABLE {table} (row_number SERIAL PRIMARY KEY)").format(
table=sql.Identifier(name)
)
)
conn.commit()
def add_column(name, column, type):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("ALTER TABLE {table} ADD {column}" + type).format(
table=sql.Identifier(name), column=sql.Identifier(column)
)
)
conn.commit()
def insert_columns(name, data):
conn = get_connection()
cur = conn.cursor()
str = "(" + "DEFAULT" + ", %s" * (len(data) - 1) + ", %s" + ")" # TODO: change.
print(str)
cur.execute(
sql.SQL("INSERT INTO {table} VALUES" + str).format(table=sql.Identifier(name)),
data,
)
conn.commit()
def edit_columns(name, columns, data, id):
conn = get_connection()
cur = conn.cursor()
i = 0
print(columns, data, id)
for column in columns:
cur.execute(
sql.SQL("UPDATE {table} SET {col} = %s WHERE row_number = {id}").format(
table=sql.Identifier(name),
col=sql.Identifier(column),
id=sql.Literal(id),
),
[data[i]],
)
i += 1
conn.commit()
def remove_column(name, column):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("ALTER TABLE {table} DROP COLUMN {column}").format(
table=sql.Identifier(name), column=sql.Identifier(column)
)
)
conn.commit()
def create_sort(name):
conn = get_connection() conn = get_connection()
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
sql.SQL( sql.SQL(
"CREATE TABLE {table} (property TEXT, _order CHAR(3), priority int)" "CREATE TABLE t_sort (table_id INTEGER, property TEXT, _order CHAR(3), priority int)"
).format(table=sql.Identifier(name + "_sort"))
)
conn.commit()
def add_sort(name, property, order, priority):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("INSERT INTO {table} VALUES (%s, %s, %s)").format(
table=sql.Identifier(name + "_sort")
),
(property, order, priority),
)
conn.commit()
def sort(name):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("SELECT * FROM {table} ORDER BY priority").format(
table=sql.Identifier(name + "_sort")
),
)
order_clause = "ORDER BY "
i = 0
for sort in cur:
if i > 0:
order_clause += ", "
order_clause += sort[0] + " " + sort[1]
i += 1
cur.execute(
sql.SQL("SELECT * FROM {table} " + order_clause).format(
table=sql.Identifier(name)
),
)
return list(cur.fetchall())
def add_function():
conn = get_connection()
cur = conn.cursor()
cur.execute(
"""
CREATE OR REPLACE FUNCTION trigger_function()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $$
DECLARE
name text := TG_ARGV[0]::text;
BEGIN
IF NEW.property NOT IN (
SELECT column_name
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = name)
THEN
RAISE EXCEPTION 'ERROR %', NEW.property;
END IF;
RETURN NEW;
END;
$$;
"""
)
conn.commit()
def add_filter_trigger(name):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL(
"""
CREATE TRIGGER {filter}
BEFORE INSERT OR UPDATE
ON {filter}
FOR EACH ROW
EXECUTE PROCEDURE trigger_function({table});
"""
).format(table=sql.Identifier(name), filter=sql.Identifier(name + "_filter"))
)
conn.commit()
def create_filter(name):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL(
"""
CREATE TABLE {table} (
property TEXT,
value TEXT,
function TEXT CHECK (function IN ('c', 'e', 'n'))
) )
"""
).format(table=sql.Identifier(name + "_filter"))
) )
conn.commit() conn.commit()
def add_filter(name, property, value, function): def add_sort(id, property, order, priority):
conn = get_connection() conn = get_connection()
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
sql.SQL("INSERT INTO {table} VALUES (%s, %s, %s)").format( sql.SQL("INSERT INTO t_sort VALUES (%s, %s, %s, %s)"),
table=sql.Identifier(name + "_filter") (id, property, order, priority),
),
(property, value, function),
) )
conn.commit() conn.commit()
def filter(name): def get_sort(id):
conn = get_connection() conn = get_connection()
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
sql.SQL("SELECT * FROM {table}").format(table=sql.Identifier(name + "_filter")), sql.SQL("SELECT * FROM t_sort WHERE table_id = {id} ORDER BY priority").format(
id=sql.Literal(id),
)
) )
filter_clause = "WHERE " return cur.fetchall()
i = 0
for sort in cur:
if i > 0: # def add_function():
filter_clause += " AND " # conn = get_connection()
filter_clause += sort[0] # cur = conn.cursor()
match sort[2]: # cur.execute(
case "e": # """
filter_clause += " = '" + sort[1] + "'" # CREATE OR REPLACE FUNCTION trigger_function()
case "ne": # RETURNS TRIGGER
filter_clause += " <> '" + sort[1] + "'" # LANGUAGE PLPGSQL
case "le": # AS $$
filter_clause += " <= " + sort[1] # DECLARE
case "ge": # name text := TG_ARGV[0]::text;
filter_clause += " >= " + sort[1] # BEGIN
case "l": # IF NEW.property NOT IN (
filter_clause += " < " + sort[1] # SELECT column_name
case "g": # FROM INFORMATION_SCHEMA.COLUMNS
filter_clause += " > " + sort[1] # WHERE TABLE_NAME = name)
case "c": # THEN
filter_clause += " ILIKE '%" + sort[1] + "'" # RAISE EXCEPTION 'ERROR %', NEW.property;
case "_": #
raise "Invalid filter function" # END IF;
i += 1 #
# RETURN NEW;
# END;
# $$;
# """
# )
# conn.commit()
#
#
# def add_filter_trigger(name):
# conn = get_connection()
# cur = conn.cursor()
# cur.execute(
# sql.SQL(
# """
# CREATE TRIGGER {filter}
# BEFORE INSERT OR UPDATE
# ON {filter}
# FOR EACH ROW
# EXECUTE PROCEDURE trigger_function({table});
# """
# ).format(table=sql.Identifier(name), filter=sql.Identifier(name + "_filter"))
# )
# conn.commit()
def create_filter():
conn = get_connection()
cur = conn.cursor()
cur.execute( cur.execute(
sql.SQL("SELECT * FROM {table} " + filter_clause).format( sql.SQL(
table=sql.Identifier(name) "CREATE TABLE t_filter (table_id INTEGER, property TEXT, value TEXT, function TEXT CHECK (function IN ('c', 'e', 'ne', 'le', 'ge', 'l', 'g')))"
), )
) )
return list(cur.fetchall()) conn.commit()
def add_filter(id, property, value, function):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("INSERT INTO t_filter VALUES (%s, %s, %s, %s)").format(
),
(id, property, value, function),
)
conn.commit()
def get_filter(id):
conn = get_connection()
cur = conn.cursor()
cur.execute(
sql.SQL("SELECT * FROM t_filter WHERE table_id = {id}").format(
id=sql.Literal(id),
)
)
return cur.fetchall()