Files
Tg-job/mtucijobsbackend/app/crud.py

150 lines
5.5 KiB
Python

from fastapi import status, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from sqlalchemy import func
from . import models
from typing import List, Dict
from datetime import date
import requests
def get_group_pattern(year):
today = date.today()
current_year = int(today.strftime("%Y"))
current_month = int(today.strftime("%m"))
if current_month > 5:
year -= 1
if year <= 0:
return "_"
pattern = "^[:alpha:]"
for i in range(year):
pattern += "|" + str(current_year % 100 - i)
return pattern
def send_webhook_background(event_type: str, data: Dict):
webhook_url = "http://bot:3006/webhook"
payload = {
"event": event_type,
"data": data
}
try:
response = requests.post(webhook_url, json=payload)
response.raise_for_status()
except requests.RequestException as e:
print(f"Failed to send webhook: {e}")
def send_webhook_response_updated(data: Dict):
event_type = "response_updated"
send_webhook_background(event_type, data)
def update_hardskills(entity_id: int, hardskills: List[str], is_job: bool, db: Session):
model = models.JobsHard_skills if is_job else models.StudentsHard_skills
id_field = 'JobID' if is_job else 'StudentID'
# Удаление существующих навыков
db.query(model).filter(getattr(model, id_field) == entity_id).delete()
db.flush()
# Добавление новых навыков
hardskills_query = db.query(models.Hard_skills).filter(models.Hard_skills.Title.in_(hardskills))
new_hardskills = [model(**{id_field: entity_id, 'Hard_skillID': hardskill.Hard_skillID}) for hardskill in hardskills_query]
db.add_all(new_hardskills)
db.flush()
def update_matches(entity_id: int, is_job: bool, db: Session, background_tasks: BackgroundTasks) -> List[Dict]:
if is_job:
matches = db.query(
models.JobsHard_skills.JobID,
models.StudentsHard_skills.StudentID,
func.count(models.StudentsHard_skills.Hard_skillID).label("count_students_skills")
).join(models.JobsHard_skills, models.JobsHard_skills.Hard_skillID == models.StudentsHard_skills.Hard_skillID).\
filter(models.JobsHard_skills.JobID == entity_id).\
group_by(models.JobsHard_skills.JobID, models.StudentsHard_skills.StudentID).\
all()
count_jobs_skills = db.query(func.count(models.JobsHard_skills.Hard_skillID)).filter(models.JobsHard_skills.JobID == entity_id).scalar()
db.query(models.Matches).filter(models.Matches.JobID == entity_id).delete()
else:
matches = db.query(
models.StudentsHard_skills.StudentID,
models.JobsHard_skills.JobID,
func.count(models.StudentsHard_skills.Hard_skillID).label("count_students_skills")
).join(models.Jobs, models.Jobs.JobID == models.JobsHard_skills.JobID).\
join(models.StudentsHard_skills, models.JobsHard_skills.Hard_skillID == models.StudentsHard_skills.Hard_skillID).\
filter(models.Jobs.Archive == False, models.StudentsHard_skills.StudentID == entity_id).\
group_by(models.StudentsHard_skills.StudentID, models.JobsHard_skills.JobID).\
all()
db.query(models.Matches).filter(models.Matches.StudentID == entity_id).delete()
db.flush()
updated_matches = []
for match in matches:
if is_job:
MATCH = match.count_students_skills / count_jobs_skills * 100
updated_matches.append({
"job_id": match.JobID,
"student_id": match.StudentID,
"match_percentage": MATCH
})
else:
count_jobs_skills = db.query(func.count(models.JobsHard_skills.Hard_skillID)).filter(models.JobsHard_skills.JobID == match.JobID).scalar()
MATCH = match.count_students_skills / count_jobs_skills * 100
updated_matches.append({
"job_id": match.JobID,
"student_id": match.StudentID,
"match_percentage": MATCH
})
new_match = models.Matches(StudentID=match.StudentID, JobID=match.JobID, Match=MATCH)
db.add(new_match)
db.flush()
if updated_matches:
background_tasks.add_task(send_webhook_background, "matches_updated", {
"entity_type": "job" if is_job else "student",
"entity_id": entity_id,
"updated_matches": updated_matches
})
return updated_matches
def update_record(model, model_id: int, updated_data: dict, db: Session, background_tasks: BackgroundTasks):
query = db.query(model).filter(model.ResponseID == model_id)
record = query.first()
if not record:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query.update(updated_data, synchronize_session=False)
db.flush()
commit_transaction(db)
post_update_data = {
"response_id": record.ResponseID,
"student_id": record.StudentID,
"job_id": record.JobID,
"status": record.Status,
"comment": record.Comment,
"link": record.Link
}
if post_update_data:
background_tasks.add_task(send_webhook_response_updated, post_update_data)
return query.first()
def commit_transaction(db):
try:
db.commit()
except:
db.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT)