feat: (函数化history添加和计算)添加历史记录处理函数以简化数据填充和计算逻辑
- 新增add_history和calc_history函数,整合入职、晋升、晋档和晋级记录的处理逻辑 - 优化历史记录的字段更新和复杂数据计算,提升代码可读性和维护性 - 调整历史记录填充顺序,确保数据一致性和准确性
This commit is contained in:
parent
30867f302c
commit
c374dcf40c
198
main.py
198
main.py
@ -272,6 +272,105 @@ def fill_prompt_info(ws, promote):# 填充晋升信息
|
||||
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
|
||||
ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"])
|
||||
|
||||
def add_history(History_pd, row, promote):
|
||||
# 添加入职记录
|
||||
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
|
||||
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
|
||||
for index, prow in promote.iterrows(): # 添加晋升记录
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
|
||||
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
|
||||
prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
|
||||
try:
|
||||
# 添加晋档记录
|
||||
calctime=row["晋档起始"] + relativedelta(minute=1)
|
||||
while True:
|
||||
calctime += relativedelta(years=row["晋档间隔"])
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||||
calctime,"两年晋档"]
|
||||
calctime=row["晋级起始"]
|
||||
# 添加晋级记录
|
||||
while True:
|
||||
calctime += relativedelta(years=row["晋级间隔"])
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||||
calctime,"五年晋级"]
|
||||
except:
|
||||
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
|
||||
# 工资调标
|
||||
for rule in Rule_Level:
|
||||
if row["入职时间"] < rule["start"]:
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
|
||||
History_pd["身份证号码"] = row["身份证号码"]
|
||||
History_pd["姓名"] = row["姓名"]
|
||||
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
|
||||
History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1)
|
||||
History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1)
|
||||
|
||||
def calc_history(History_pd, row):
|
||||
# 复杂数据计算
|
||||
for index, hrow in History_pd.iterrows():
|
||||
# 继承上一条复杂计算数据
|
||||
if index > 0:
|
||||
History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
|
||||
History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
|
||||
History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
|
||||
History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
|
||||
History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
|
||||
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
|
||||
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
|
||||
# 继承名称
|
||||
if pd.isna(hrow["变动后职务"]):
|
||||
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
|
||||
# 名称变化
|
||||
for rule in Rule_RoleName:
|
||||
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
|
||||
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
|
||||
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
|
||||
# 级别档次
|
||||
if index > 0:
|
||||
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
|
||||
if hrow["变动原因"] == "两年晋档":
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
elif hrow["变动原因"] == "五年晋级":
|
||||
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
|
||||
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
|
||||
elif hrow["变动原因"] == "工资调标":
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||||
elif hrow["变动原因"] == "晋升":
|
||||
role = History_pd.iloc[index]["变动后职务"]
|
||||
if role in Promote_Level.keys():
|
||||
new_jb = jb + Promote_Level[role][0]
|
||||
new_dc = dc + Promote_Level[role][1]
|
||||
if pd.isna(new_jb) or pd.isna(new_dc):
|
||||
raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})")
|
||||
else:
|
||||
new_jb = int(new_jb)
|
||||
new_dc = int(new_dc)
|
||||
if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
|
||||
role in Promote_verify.iloc[:,1].values):
|
||||
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||||
elif new_jb < role_limit(role):
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
elif new_jb < 1 or new_dc < 1:
|
||||
raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]")
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
|
||||
else:
|
||||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
|
||||
# 计算工资
|
||||
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"])
|
||||
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"])
|
||||
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"])
|
||||
|
||||
def fill_history_info(ws, History_pd):# 填充历史记录
|
||||
for index, hrow in History_pd.iterrows(): # 打印
|
||||
for col in range(1, 11): # 复制样式
|
||||
@ -359,106 +458,13 @@ def main():
|
||||
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
|
||||
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
|
||||
|
||||
# 添加入职记录
|
||||
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
|
||||
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
|
||||
for index, prow in promote.iterrows(): # 添加晋升记录
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
|
||||
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
|
||||
prow["新职务"],"晋升",f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
|
||||
try:
|
||||
# 添加晋档记录
|
||||
calctime=row["晋档起始"] + relativedelta(minute=1)
|
||||
while True:
|
||||
calctime += relativedelta(years=row["晋档间隔"])
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||||
calctime,"两年晋档"]
|
||||
calctime=row["晋级起始"]
|
||||
# 添加晋级记录
|
||||
while True:
|
||||
calctime += relativedelta(years=row["晋级间隔"])
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [
|
||||
calctime,"五年晋级"]
|
||||
except:
|
||||
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
|
||||
# 工资调标
|
||||
for rule in Rule_Level:
|
||||
if row["入职时间"] < rule["start"]:
|
||||
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
|
||||
History_pd["身份证号码"] = row["身份证号码"]
|
||||
History_pd["姓名"] = row["姓名"]
|
||||
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"].year), axis=1)
|
||||
History_pd["五年1级年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋级起始"].year, axis=1)
|
||||
History_pd["两年1档年份"] = History_pd.apply(lambda x: x["变动后时间"].year - row["晋档起始"].year, axis=1)
|
||||
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
|
||||
add_history(History_pd, row, promote)
|
||||
|
||||
History_pd = History_pd.sort_values(by="变动后时间").reset_index(drop=True)
|
||||
if History_pd.at[0,"变动后时间"] != row["入职时间"]:
|
||||
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'变动后时间']} ({History_pd.at[0,'变动原因']})")
|
||||
|
||||
# 复杂数据计算
|
||||
for index, hrow in History_pd.iterrows():
|
||||
# 继承上一条复杂计算数据
|
||||
if index > 0:
|
||||
History_pd.at[index, "变动前时间"] = History_pd.at[index-1, "变动后时间"]
|
||||
History_pd.at[index, "变动前职务"] = History_pd.at[index-1, "变动后职务"]
|
||||
History_pd.at[index, "变动前级别档次"] = History_pd.at[index-1, "变动后级别档次"]
|
||||
History_pd.at[index, "变动前职务工资"] = History_pd.at[index-1, "变动后职务工资"]
|
||||
History_pd.at[index, "变动前级别工资"] = History_pd.at[index-1, "变动后级别工资"]
|
||||
History_pd.at[index, "变动前津贴工资"] = History_pd.at[index-1, "变动后津贴工资"]
|
||||
History_pd.at[index, "变动前工资合计"] = History_pd.at[index-1, "变动后工资合计"]
|
||||
# 继承名称
|
||||
if pd.isna(hrow["变动后职务"]):
|
||||
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
|
||||
# 名称变化
|
||||
for rule in Rule_RoleName:
|
||||
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
|
||||
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
|
||||
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
|
||||
# 级别档次
|
||||
if index > 0:
|
||||
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
|
||||
if hrow["变动原因"] == "两年晋档":
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
elif hrow["变动原因"] == "五年晋级":
|
||||
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["变动后职务"]):
|
||||
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
|
||||
elif hrow["变动原因"] == "工资调标":
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||||
elif hrow["变动原因"] == "晋升":
|
||||
role = History_pd.iloc[index]["变动后职务"]
|
||||
if role in Promote_Level.keys():
|
||||
new_jb = jb + Promote_Level[role][0]
|
||||
new_dc = dc + Promote_Level[role][1]
|
||||
if pd.isna(new_jb) or pd.isna(new_dc):
|
||||
raise Exception(f"级别档次计算出现NaN值:[{new_jb}]-[{new_dc}]({role})")
|
||||
else:
|
||||
new_jb = int(new_jb)
|
||||
new_dc = int(new_dc)
|
||||
if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
|
||||
role in Promote_verify.iloc[:,1].values):
|
||||
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
|
||||
elif new_jb < role_limit(role):
|
||||
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
|
||||
elif new_jb < 1 or new_dc < 1:
|
||||
raise Exception(f"级别档次小于0:[{new_jb}]-[{new_dc}]")
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
|
||||
else:
|
||||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||||
else:
|
||||
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
|
||||
# 计算工资
|
||||
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["变动后职务"], hrow["变动后时间"])
|
||||
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["变动后级别档次"], hrow["变动后时间"])
|
||||
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["变动后职务工资"]) + to_int(History_pd.iloc[index]["变动后级别工资"])
|
||||
calc_history(History_pd, row)
|
||||
|
||||
fill_history_info(ws, History_pd)# 填充历史记录
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user