Compare commits

..

8 Commits

Author SHA1 Message Date
30867f302c feat: 添加五年1级和两年1档年份计算字段
- 在历史记录中新增五年1级年份和两年1档年份的计算逻辑
- 更新变动前时间字段的格式化处理,确保数据一致性
2025-06-14 06:05:46 +08:00
e97448fc51 fix: 优化历史记录填充逻辑,简化字段更新和计算
- 更新历史记录填充逻辑,移除冗余字段,确保只使用必要的变动后字段
- 调整工龄计算和排序逻辑,确保数据准确性
- 修复晋级记录和工资计算中的字段引用,提升代码可读性
2025-06-14 05:54:24 +08:00
2eb841639e feat: 添加format_time_ymd函数并更新历史记录时间格式
新增format_time_ymd函数用于将时间格式化为"年.月.日"形式
修改main函数中使用format_time_ymd替代原format_time处理历史记录时间
2025-06-14 04:26:35 +08:00
17d653ee7a fix: 更新历史记录填充逻辑以使用变动后字段
将历史记录填充函数中的字段从原始字段改为变动后字段,以正确反映历史记录中的最新数据。同时保留时间字段的异常处理逻辑。
2025-06-14 04:26:24 +08:00
11eb67a1d8 fix: 调整员工历史记录中身份证和姓名的赋值位置
将身份证号码和姓名的赋值操作移到循环内部,确保每条历史记录都包含正确的个人信息
2025-06-14 04:22:41 +08:00
ced4de4d4b refactor(历史记录计算): 重构历史记录计算逻辑,优化字段顺序和继承关系
调整历史记录字段顺序,将复杂计算字段移到后面
重构数据继承逻辑,简化职务和级别档次的处理
优化晋升校验规则和工资计算流程
2025-06-14 04:21:37 +08:00
d5ef9972a1 feat: 在历史记录中添加工资合计字段并优化计算逻辑
- 在变动前后工资字段中添加"工资合计"字段
- 优化晋档和晋级记录的计算逻辑,同时更新五年1级和两年1档年份
- 添加工龄计算功能
2025-06-14 03:43:17 +08:00
2b7d8ebb4e refactor(薪资计算): 重构历史薪资记录数据结构
优化历史薪资记录DataFrame的列结构,增加更多详细信息字段
统一数据填充方式,使用列名直接赋值提高可读性
2025-06-14 03:19:34 +08:00

133
main.py
View File

@ -48,6 +48,14 @@ def format_time(dt,info):
logging.warning(f"[{info}]时间格式错误:{dt}") logging.warning(f"[{info}]时间格式错误:{dt}")
return dt return dt
def format_time_ymd(dt,info):
try:
return dt.strftime("%Y.%m.%d")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def to_int(x): def to_int(x):
try: try:
return int(x) return int(x)
@ -269,14 +277,14 @@ def fill_history_info(ws, History_pd):# 填充历史记录
for col in range(1, 11): # 复制样式 for col in range(1, 11): # 复制样式
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
try: try:
ws.cell(row=H_START+index, column=1, value=format_time(hrow["时间"],"历史时间")) ws.cell(row=H_START+index, column=1, value=format_time(hrow["变动后时间"],"历史时间"))
except: except:
logging.warning(f"历史时间格式错误:{hrow['时间']}") logging.warning(f"历史时间格式错误:{hrow['时间']}")
ws.cell(row=H_START+index, column=2, value=hrow["职务"]) ws.cell(row=H_START+index, column=2, value=hrow["变动后职务"])
ws.cell(row=H_START+index, column=3, value=hrow["职务工资"]) ws.cell(row=H_START+index, column=3, value=hrow["变动后职务工资"])
ws.cell(row=H_START+index, column=4, value=hrow["级别档次"]) ws.cell(row=H_START+index, column=4, value=hrow["变动后级别档次"])
ws.cell(row=H_START+index, column=5, value=hrow["级别工资"]) ws.cell(row=H_START+index, column=5, value=hrow["变动后级别工资"])
ws.cell(row=H_START+index, column=6, value=hrow["工资合计"]) ws.cell(row=H_START+index, column=6, value=hrow["变动后工资合计"])
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"]) ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
# ws.cell(row=H_START+index, column=8, value=index) # Debug # ws.cell(row=H_START+index, column=8, value=index) # Debug
@ -344,85 +352,117 @@ def main():
fill_prompt_info(ws, promote)# 填充晋升信息 fill_prompt_info(ws, promote)# 填充晋升信息
# 根据规则匹配职级薪资 # 根据规则匹配职级薪资
History_pd = pd.DataFrame(columns=["时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"]) History_pd = pd.DataFrame(columns=[
"身份证号码", "姓名", # 统一填入
"变动后时间", "变动后职务", "变动原因", "晋升备注", # 直接填入
"工龄", "五年1级年份", "两年1档年份", # 简单计算更新
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
# 添加入职记录 # 添加入职记录
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级", ""] History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
for index, prow in promote.iterrows(): # 添加晋升记录 for index, prow in promote.iterrows(): # 添加晋升记录
History_pd.loc[len(History_pd)] = [prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), prow["新职务"], "", "", "", "", "晋升", f"{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"] History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
prow["新职务"],"晋升",f"{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
try: try:
# 添加晋档记录
calctime=row["晋档起始"] + relativedelta(minute=1) calctime=row["晋档起始"] + relativedelta(minute=1)
while True: # 添加晋档记录 while True:
calctime += relativedelta(years=row["晋档间隔"]) calctime += relativedelta(years=row["晋档间隔"])
if calctime > NOWTIME: if calctime > NOWTIME:
break break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档", ""] History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"两年晋档"]
calctime=row["晋级起始"] calctime=row["晋级起始"]
while True: # 添加晋级记录 # 添加晋级记录
while True:
calctime += relativedelta(years=row["晋级间隔"]) calctime += relativedelta(years=row["晋级间隔"])
if calctime > NOWTIME: if calctime > NOWTIME:
break break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级", ""] History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
calctime,"五年晋级"]
except: except:
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}") raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
for rule in Rule_Level: # 工资调标 # 工资调标
for rule in Rule_Level:
if row["入职时间"] < rule["start"]: if row["入职时间"] < rule["start"]:
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标", ""] History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True) History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1)
History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1)
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
if History_pd.at[0,"时间"] != row["入职时间"]: if History_pd.at[0,"变动后时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})") raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})")
for index, hrow in History_pd.iterrows(): # 数据计算
# 调整职务职级 # 复杂数据计算
if index > 0 and hrow["职务"] == "": for index, hrow in History_pd.iterrows():
History_pd.at[index, "职务"] = History_pd.iloc[index-1]["职务"] # 继承上一条复杂计算数据
for rule in Rule_RoleName: # 名称变化 if index > 0:
if rule["start"] <= hrow["时间"] <= rule["end"]: History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
if History_pd.iloc[index]["职务"] in rule["rule"]["原名称"].values: History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
History_pd.at[index, "职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.iloc[index]["职务"]]["现名称"].values[0] History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
if index > 0 and hrow["级别档次"] == "": History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
jb, dc = split_level(History_pd.iloc[index-1]["级别档次"]) History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
# 继承名称
if pd.isna(hrow["变动后职务"]):
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
# 名称变化
for rule in Rule_RoleName:
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
# 级别档次
if index > 0:
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
if hrow["变动原因"] == "两年晋档": if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}" History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级": elif hrow["变动原因"] == "五年晋级":
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["职务"]): if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
else: else:
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}" History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
elif hrow["变动原因"] == "工资调标": elif hrow["变动原因"] == "工资调标":
History_pd.at[index, "级别档次"] = f"{jb}-{dc}" History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif hrow["变动原因"] == "晋升": elif hrow["变动原因"] == "晋升":
role = History_pd.iloc[index]["职务"] role = History_pd.iloc[index]["变动后职务"]
if role in Promote_Level.keys(): if role in Promote_Level.keys():
new_jb = jb + Promote_Level[role][0] new_jb = jb + Promote_Level[role][0]
new_dc = dc + Promote_Level[role][1] new_dc = dc + Promote_Level[role][1]
if pd.isna(new_jb) or pd.isna(new_dc): if pd.isna(new_jb) or pd.isna(new_dc):
print(Promote_Level[role][1])
raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})") raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})")
else: else:
new_jb = int(new_jb) new_jb = int(new_jb)
new_dc = int(new_dc) new_dc = int(new_dc)
if (History_pd.iloc[index-1]["职务"] in Promote_verify.iloc[:,0].values and if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
role in Promote_verify.iloc[:,1].values): role in Promote_verify.iloc[:,1].values):
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.iloc[index-1]['职务']}]->[{role}]") logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
History_pd.at[index, "级别档次"] = f"{jb}-{dc}" History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif new_jb < role_limit(role): elif new_jb < role_limit(role):
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}" History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif new_jb < 1 or new_dc < 1: elif new_jb < 1 or new_dc < 1:
raise Exception(f"级别档次小于0[{new_jb}]-[{new_dc}]") raise Exception(f"级别档次小于0[{new_jb}]-[{new_dc}]")
else: else:
History_pd.at[index, "级别档次"] = f"{new_jb}-{new_dc}" History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
else: else:
logging.warning(f"职位[{role}]不存在职级上限规则") logging.warning(f"职位[{role}]不存在职级上限规则")
else:
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
# 计算工资 # 计算工资
History_pd.at[index, "职务工资"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"]) History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"])
History_pd.at[index, "级别工资"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"]) History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"])
History_pd.at[index, "工资合计"] = to_int(History_pd.iloc[index]["职务工资"]) + to_int(History_pd.iloc[index]["级别工资"]) History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"])
fill_history_info(ws, History_pd)# 填充历史记录 fill_history_info(ws, History_pd)# 填充历史记录
# 将当前人员的历史记录添加到总表中 # 将当前人员的历史记录添加到总表中
History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
all_history = pd.concat([all_history, History_pd], ignore_index=True) all_history = pd.concat([all_history, History_pd], ignore_index=True)
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx") wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
@ -430,7 +470,8 @@ def main():
logging.error(f"{row['身份证号码']}:{e}") logging.error(f"{row['身份证号码']}:{e}")
# 保存所有历史记录到Excel文件 # 保存所有历史记录到Excel文件
all_history["时间"] = all_history["时间"].apply(lambda x: format_time(x, "历史记录时间")) all_history["变动后时间"] = all_history["变动后时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history["变动前时间"] = all_history["变动前时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history.to_excel("所有人员历史记录.xlsx", index=False) all_history.to_excel("所有人员历史记录.xlsx", index=False)
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'") logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")