from fastapi import status, HTTPException, Depends, BackgroundTasks from sqlalchemy.orm import Session from typing import List from sqlalchemy import func from . import models from typing import List, Dict from datetime import date import requests def get_group_pattern(year): today = date.today() current_year = int(today.strftime("%Y")) current_month = int(today.strftime("%m")) if current_month > 5: year -= 1 if year <= 0: return "_" pattern = "^[:alpha:]" for i in range(year): pattern += "|" + str(current_year % 100 - i) return pattern def send_webhook_background(event_type: str, data: Dict): webhook_url = "http://bot:3006/webhook" payload = { "event": event_type, "data": data } try: response = requests.post(webhook_url, json=payload) response.raise_for_status() except requests.RequestException as e: print(f"Failed to send webhook: {e}") def send_webhook_response_updated(data: Dict): event_type = "response_updated" send_webhook_background(event_type, data) def update_hardskills(entity_id: int, hardskills: List[str], is_job: bool, db: Session): model = models.JobsHard_skills if is_job else models.StudentsHard_skills id_field = 'JobID' if is_job else 'StudentID' # Удаление существующих навыков db.query(model).filter(getattr(model, id_field) == entity_id).delete() db.flush() # Добавление новых навыков hardskills_query = db.query(models.Hard_skills).filter(models.Hard_skills.Title.in_(hardskills)) new_hardskills = [model(**{id_field: entity_id, 'Hard_skillID': hardskill.Hard_skillID}) for hardskill in hardskills_query] db.add_all(new_hardskills) db.flush() def update_matches(entity_id: int, is_job: bool, db: Session, background_tasks: BackgroundTasks) -> List[Dict]: if is_job: matches = db.query( models.JobsHard_skills.JobID, models.StudentsHard_skills.StudentID, func.count(models.StudentsHard_skills.Hard_skillID).label("count_students_skills") ).join(models.JobsHard_skills, models.JobsHard_skills.Hard_skillID == models.StudentsHard_skills.Hard_skillID).\ filter(models.JobsHard_skills.JobID == entity_id).\ group_by(models.JobsHard_skills.JobID, models.StudentsHard_skills.StudentID).\ all() count_jobs_skills = db.query(func.count(models.JobsHard_skills.Hard_skillID)).filter(models.JobsHard_skills.JobID == entity_id).scalar() db.query(models.Matches).filter(models.Matches.JobID == entity_id).delete() else: matches = db.query( models.StudentsHard_skills.StudentID, models.JobsHard_skills.JobID, func.count(models.StudentsHard_skills.Hard_skillID).label("count_students_skills") ).join(models.Jobs, models.Jobs.JobID == models.JobsHard_skills.JobID).\ join(models.StudentsHard_skills, models.JobsHard_skills.Hard_skillID == models.StudentsHard_skills.Hard_skillID).\ filter(models.Jobs.Archive == False, models.StudentsHard_skills.StudentID == entity_id).\ group_by(models.StudentsHard_skills.StudentID, models.JobsHard_skills.JobID).\ all() db.query(models.Matches).filter(models.Matches.StudentID == entity_id).delete() db.flush() updated_matches = [] for match in matches: if is_job: MATCH = match.count_students_skills / count_jobs_skills * 100 updated_matches.append({ "job_id": match.JobID, "student_id": match.StudentID, "match_percentage": MATCH }) else: count_jobs_skills = db.query(func.count(models.JobsHard_skills.Hard_skillID)).filter(models.JobsHard_skills.JobID == match.JobID).scalar() MATCH = match.count_students_skills / count_jobs_skills * 100 updated_matches.append({ "job_id": match.JobID, "student_id": match.StudentID, "match_percentage": MATCH }) new_match = models.Matches(StudentID=match.StudentID, JobID=match.JobID, Match=MATCH) db.add(new_match) db.flush() if updated_matches: background_tasks.add_task(send_webhook_background, "matches_updated", { "entity_type": "job" if is_job else "student", "entity_id": entity_id, "updated_matches": updated_matches }) return updated_matches def update_record(model, model_id: int, updated_data: dict, db: Session, background_tasks: BackgroundTasks): query = db.query(model).filter(model.ResponseID == model_id) record = query.first() if not record: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) query.update(updated_data, synchronize_session=False) db.flush() commit_transaction(db) post_update_data = { "response_id": record.ResponseID, "student_id": record.StudentID, "job_id": record.JobID, "status": record.Status, "comment": record.Comment, "link": record.Link } if post_update_data: background_tasks.add_task(send_webhook_response_updated, post_update_data) return query.first() def commit_transaction(db): try: db.commit() except: db.rollback() raise HTTPException(status_code=status.HTTP_409_CONFLICT)