From c374dcf40cb09b4deb307dcd1e0963a07a5568fa Mon Sep 17 00:00:00 2001 From: mxr612 Date: Sat, 14 Jun 2025 06:15:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=EF=BC=88=E5=87=BD=E6=95=B0=E5=8C=96his?= =?UTF-8?q?tory=E6=B7=BB=E5=8A=A0=E5=92=8C=E8=AE=A1=E7=AE=97=EF=BC=89?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8E=86=E5=8F=B2=E8=AE=B0=E5=BD=95=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=87=BD=E6=95=B0=E4=BB=A5=E7=AE=80=E5=8C=96=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=A1=AB=E5=85=85=E5=92=8C=E8=AE=A1=E7=AE=97=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增add_history和calc_history函数,整合入职、晋升、晋档和晋级记录的处理逻辑 - 优化历史记录的字段更新和复杂数据计算,提升代码可读性和维护性 - 调整历史记录填充顺序,确保数据一致性和准确性 --- main.py | 198 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 102 insertions(+), 96 deletions(-) diff --git a/main.py b/main.py index d0d4e38..83acfb0 100644 --- a/main.py +++ b/main.py @@ -272,6 +272,105 @@ def fill_prompt_info(ws, promote):# 填充晋升信息 ws.cell(row=P_START+index, column=2, value=prow["变动批注"]) ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"]) +def add_history(History_pd, row, promote): + # 添加入职记录 + History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [ + row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]] + for index, prow in promote.iterrows(): # 添加晋升记录 + History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [ + prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), + prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"] + try: + # 添加晋档记录 + calctime=row["晋档起始"] + relativedelta(minute=1) + while True: + calctime += relativedelta(years=row["晋档间隔"]) + if calctime > NOWTIME: + break + History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ + calctime,"两年晋档"] + calctime=row["晋级起始"] + # 添加晋级记录 + while True: + calctime += relativedelta(years=row["晋级间隔"]) + if calctime > NOWTIME: + break + History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ + calctime,"五年晋级"] + except: + raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}") + # 工资调标 + for rule in Rule_Level: + if row["入职时间"] < rule["start"]: + History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"] + History_pd["身份证号码"] = row["身份证号码"] + History_pd["姓名"] = row["姓名"] + History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1) + History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1) + History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1) + +def calc_history(History_pd, row): + # 复杂数据计算 + for index, hrow in History_pd.iterrows(): + # 继承上一条复杂计算数据 + if index > 0: + History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"] + History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"] + History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"] + History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"] + History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"] + History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"] + History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"] + # 继承名称 + if pd.isna(hrow["变动后职务"]): + History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"] + # 名称变化 + for rule in Rule_RoleName: + if rule["start"] <= hrow["变动后时间"] <= rule["end"]: + if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values: + History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0] + # 级别档次 + if index > 0: + jb, dc = split_level(History_pd.at[index,"变动前级别档次"]) + if hrow["变动原因"] == "两年晋档": + History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" + elif hrow["变动原因"] == "五年晋级": + if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]): + + History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" + else: + History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}" + elif hrow["变动原因"] == "工资调标": + History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" + elif hrow["变动原因"] == "晋升": + role = History_pd.iloc[index]["变动后职务"] + if role in Promote_Level.keys(): + new_jb = jb + Promote_Level[role][0] + new_dc = dc + Promote_Level[role][1] + if pd.isna(new_jb) or pd.isna(new_dc): + raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})") + else: + new_jb = int(new_jb) + new_dc = int(new_dc) + if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and + role in Promote_verify.iloc[:,1].values): + logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]") + History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" + elif new_jb < role_limit(role): + History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" + elif new_jb < 1 or new_dc < 1: + raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]") + else: + History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}" + else: + logging.warning(f"职位[{role}]不存在职级上限规则") + else: + History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"] + # 计算工资 + History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"]) + History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"]) + History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"]) + def fill_history_info(ws, History_pd):# 填充历史记录 for index, hrow in History_pd.iterrows(): # 打印 for col in range(1, 11): # 复制样式 @@ -359,106 +458,13 @@ def main(): "变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新 "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新 - # 添加入职记录 - History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [ - row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]] - for index, prow in promote.iterrows(): # 添加晋升记录 - History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [ - prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), - prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"] - try: - # 添加晋档记录 - calctime=row["晋档起始"] + relativedelta(minute=1) - while True: - calctime += relativedelta(years=row["晋档间隔"]) - if calctime > NOWTIME: - break - History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ - calctime,"两年晋档"] - calctime=row["晋级起始"] - # 添加晋级记录 - while True: - calctime += relativedelta(years=row["晋级间隔"]) - if calctime > NOWTIME: - break - History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [ - calctime,"五年晋级"] - except: - raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}") - # 工资调标 - for rule in Rule_Level: - if row["入职时间"] < rule["start"]: - History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"] - History_pd["身份证号码"] = row["身份证号码"] - History_pd["姓名"] = row["姓名"] - History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1) - History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1) - History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1) - History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True) + add_history(History_pd, row, promote) + History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True) if History_pd.at[0,"变动后时间"] != row["入职时间"]: raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})") - # 复杂数据计算 - for index, hrow in History_pd.iterrows(): - # 继承上一条复杂计算数据 - if index > 0: - History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"] - History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"] - History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"] - History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"] - History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"] - History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"] - History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"] - # 继承名称 - if pd.isna(hrow["变动后职务"]): - History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"] - # 名称变化 - for rule in Rule_RoleName: - if rule["start"] <= hrow["变动后时间"] <= rule["end"]: - if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values: - History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0] - # 级别档次 - if index > 0: - jb, dc = split_level(History_pd.at[index,"变动前级别档次"]) - if hrow["变动原因"] == "两年晋档": - History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" - elif hrow["变动原因"] == "五年晋级": - if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]): - - History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" - else: - History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}" - elif hrow["变动原因"] == "工资调标": - History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" - elif hrow["变动原因"] == "晋升": - role = History_pd.iloc[index]["变动后职务"] - if role in Promote_Level.keys(): - new_jb = jb + Promote_Level[role][0] - new_dc = dc + Promote_Level[role][1] - if pd.isna(new_jb) or pd.isna(new_dc): - raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})") - else: - new_jb = int(new_jb) - new_dc = int(new_dc) - if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and - role in Promote_verify.iloc[:,1].values): - logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]") - History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}" - elif new_jb < role_limit(role): - History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}" - elif new_jb < 1 or new_dc < 1: - raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]") - else: - History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}" - else: - logging.warning(f"职位[{role}]不存在职级上限规则") - else: - History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"] - # 计算工资 - History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"]) - History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"]) - History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"]) + calc_history(History_pd, row) fill_history_info(ws, History_pd)# 填充历史记录