feat: add CSV download endpoint for scale results
- Implemented a new endpoint to download scale results as a CSV file. - The endpoint retrieves responses based on the scale ID and formats the data, including question IDs and subscale scores, into a CSV format. - Updated database URL to remove the public directory reference for better file access.
This commit is contained in:
parent
d99d7f7c12
commit
319844e55c
83
app.py
83
app.py
@ -12,6 +12,9 @@ from sqlalchemy.orm import Session
|
|||||||
from database import get_db, ScaleResult
|
from database import get_db, ScaleResult
|
||||||
import geoip2.database
|
import geoip2.database
|
||||||
from datetime import datetime, UTC
|
from datetime import datetime, UTC
|
||||||
|
import csv
|
||||||
|
from io import StringIO
|
||||||
|
from typing import Dict, List
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
templates = Jinja2Templates(directory="templates")
|
templates = Jinja2Templates(directory="templates")
|
||||||
@ -151,6 +154,86 @@ async def result(request: Request, scale_id: str, db: Session = Depends(get_db))
|
|||||||
})
|
})
|
||||||
raise HTTPException(status_code=404, detail="问卷未找到")
|
raise HTTPException(status_code=404, detail="问卷未找到")
|
||||||
|
|
||||||
|
@app.get("/download/{scale_id}")
|
||||||
|
async def download_scale_results(scale_id: str, token: str, db: Session = Depends(get_db)):
|
||||||
|
|
||||||
|
if scale_id == "database":
|
||||||
|
public_path = os.path.join("psychoscales.db")
|
||||||
|
if os.path.isfile(public_path):
|
||||||
|
return FileResponse(public_path)
|
||||||
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
|
|
||||||
|
# Get all responses for this scale
|
||||||
|
responses = db.query(ScaleResult).filter(ScaleResult.scale_id == scale_id).all()
|
||||||
|
|
||||||
|
if not responses:
|
||||||
|
raise HTTPException(status_code=404, detail="No responses found for this scale")
|
||||||
|
|
||||||
|
# Load scale definition to get question IDs
|
||||||
|
_, scales = load_all_scales()
|
||||||
|
scale = scales.get(scale_id)
|
||||||
|
if not scale:
|
||||||
|
raise HTTPException(status_code=404, detail="Scale not found")
|
||||||
|
|
||||||
|
# Create CSV in memory
|
||||||
|
output = StringIO()
|
||||||
|
writer = csv.writer(output)
|
||||||
|
|
||||||
|
# Write header
|
||||||
|
headers = [
|
||||||
|
"id", "created_at", "ip_address", "user_agent",
|
||||||
|
"country", "city", "latitude", "longitude"
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add question IDs as headers
|
||||||
|
question_ids = []
|
||||||
|
for qid in scale['items'].keys():
|
||||||
|
question_ids.append(str(qid))
|
||||||
|
headers.extend(question_ids)
|
||||||
|
|
||||||
|
# Add subscale scores
|
||||||
|
for subscale in scale['subscales'].keys():
|
||||||
|
headers.extend([f"{subscale}_sum", f"{subscale}_avg"])
|
||||||
|
|
||||||
|
writer.writerow(headers)
|
||||||
|
|
||||||
|
# Write data rows
|
||||||
|
for response in responses:
|
||||||
|
row = [
|
||||||
|
response.id,
|
||||||
|
response.created_at.strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
response.ip_address,
|
||||||
|
response.user_agent,
|
||||||
|
response.location.get('country', '') if response.location else '',
|
||||||
|
response.location.get('city', '') if response.location else '',
|
||||||
|
response.location.get('latitude', '') if response.location else '',
|
||||||
|
response.location.get('longitude', '') if response.location else ''
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add raw responses
|
||||||
|
raw_responses = response.raw_response
|
||||||
|
for qid in question_ids:
|
||||||
|
row.append(raw_responses.get(qid, ''))
|
||||||
|
|
||||||
|
# Add subscale scores
|
||||||
|
for subscale in scale['subscales'].keys():
|
||||||
|
row.extend([
|
||||||
|
response.sum_response.get(subscale, ''),
|
||||||
|
response.avg_response.get(subscale, '')
|
||||||
|
])
|
||||||
|
|
||||||
|
writer.writerow(row)
|
||||||
|
|
||||||
|
# Prepare response
|
||||||
|
output.seek(0)
|
||||||
|
return Response(
|
||||||
|
content=output.getvalue(),
|
||||||
|
media_type="text/csv",
|
||||||
|
headers={
|
||||||
|
"Content-Disposition": f'attachment; filename="{scale_id}_responses.csv"'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
def generate_sitemap():
|
def generate_sitemap():
|
||||||
# Create the root element
|
# Create the root element
|
||||||
urlset = ET.Element("urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9")
|
urlset = ET.Element("urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9")
|
||||||
|
@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base
|
|||||||
from sqlalchemy.orm import sessionmaker, relationship
|
from sqlalchemy.orm import sessionmaker, relationship
|
||||||
import json
|
import json
|
||||||
|
|
||||||
SQLALCHEMY_DATABASE_URL = "sqlite:///./public/psychoscales.db"
|
SQLALCHEMY_DATABASE_URL = "sqlite:///./psychoscales.db"
|
||||||
|
|
||||||
engine = create_engine(
|
engine = create_engine(
|
||||||
SQLALCHEMY_DATABASE_URL,
|
SQLALCHEMY_DATABASE_URL,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user