fix: 优化历史记录填充逻辑,简化字段更新和计算

- 更新历史记录填充逻辑,移除冗余字段,确保只使用必要的变动后字段
- 调整工龄计算和排序逻辑,确保数据准确性
- 修复晋级记录和工资计算中的字段引用,提升代码可读性
This commit is contained in:
Miu Li 2025-06-14 05:54:24 +08:00
parent 2eb841639e
commit e97448fc51

39
main.py
View File

@ -358,6 +358,7 @@ def main():
"工龄", "五年1级年份", "两年1档年份", # 简单计算更新
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
# 添加入职记录
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
@ -372,16 +373,16 @@ def main():
calctime += relativedelta(years=row["晋档间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因","五年1级年份","两年1档年份"]] = [
calctime,"两年晋档",calctime.year-row["晋级起始"].year,calctime.year-row["晋档起始"].year]
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"两年晋档"]
calctime=row["晋级起始"]
# 添加晋级记录
while True:
calctime += relativedelta(years=row["晋级间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因","五年1级年份","两年1档年份"]] = [
calctime,"五年晋级",calctime.year-row["晋级起始"].year,calctime.year-row["晋档起始"].year]
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"五年晋级"]
except:
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
# 工资调标
@ -390,19 +391,25 @@ def main():
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"]), axis=1)
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
if History_pd.at[0,"时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})")
if History_pd.at[0,"变动后时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})")
# 复杂数据计算
for index, hrow in History_pd.iterrows():
# 继承上一条复杂计算数据
if index > 0:
History_pd.loc[index,["变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计"]] = History_pd.loc[index - 1,["变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计"]]
History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
# 继承名称
if hrow["变动后职务"] == "":
if pd.isna(hrow["变动后职务"]):
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
# 名称变化
for rule in Rule_RoleName:
@ -415,7 +422,8 @@ def main():
if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级":
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["职务"]):
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
else:
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
@ -427,7 +435,6 @@ def main():
new_jb = jb + Promote_Level[role][0]
new_dc = dc + Promote_Level[role][1]
if pd.isna(new_jb) or pd.isna(new_dc):
print(Promote_Level[role][1])
raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})")
else:
new_jb = int(new_jb)
@ -447,9 +454,9 @@ def main():
else:
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
# 计算工资
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"])
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"])
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["职务工资"]) + to_int(History_pd.iloc[index]["级别工资"])
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"])
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"])
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"])
fill_history_info(ws, History_pd)# 填充历史记录
@ -461,7 +468,7 @@ def main():
logging.error(f"{row['身份证号码']}:{e}")
# 保存所有历史记录到Excel文件
all_history["时间"] = all_history["时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history["变动后时间"] = all_history["变动后时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history.to_excel("所有人员历史记录.xlsx", index=False)
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")