YansTool/main.py
mxr612 b0827c2829 refactor: 将花名册填充逻辑提取为独立函数
将main函数中的花名册填充逻辑提取为独立的fill_roster函数,提高代码可维护性和复用性
2025-06-12 21:51:02 +08:00

427 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from openpyxl.utils import get_column_letter
from openpyxl import load_workbook
from datetime import datetime
from dateutil.relativedelta import relativedelta
import logging
from datetime import datetime
# 配置日志记录
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 全局变量
## 常量
P_LIMIT = 6 # 最大晋升次数
P_START = 10 # 晋升记录开始行
H_START = 15 + P_LIMIT # 历史记录开始行
NOWTIME = datetime.now()
## 通过函数读取
BaseData = pd.DataFrame()
Promote = pd.DataFrame()
Rule_Role = []
Rule_Level = []
Rule_RoleName = []
Level_Limit = pd.DataFrame()
Promote_Level = pd.DataFrame()
Promote_verify = pd.DataFrame()
Allowance = []
## 统计量
max_promote = 0
max_history = 0
# 工具函数
def custom_date_parser(x):
try:
return datetime.strptime(x, '%Y-%m-%d')
except:
return x
def format_time(dt,info):
try:
return dt.strftime("%Y.%m")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def to_int(x):
try:
return int(x)
except:
return 0
def fallback(x):
for i in x:
if pd.notna(i) and i != '':
return i
return ''
def split_level(level:str):
try:
parts = level.split('-')
return (int(parts[0]), int(parts[1]))
except:
raise Exception(f"职级[{level}]格式错误")
# 读取信息
def read_base_data(): # 读取员工数据
global BaseData
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
BaseData[col] = BaseData[col].apply(custom_date_parser)
for col in ["晋档起始", "晋级起始"]:
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
BaseData["Latest_Role"] = None
BaseData["Latest_Prom"] = None
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
def read_promote(): # 读取晋升记录
global Promote
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
for col in ["任职时间","工资执行时间"]:
Promote[col] = Promote[col].apply(custom_date_parser)
def read_rule_role(): # 读取职位规则
global Rule_Role
col = 4
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Role.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
})
col += 2
except:
break
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
def read_rule_level(): # 读取职级规则
global Rule_Level
col = 1
while True: # 职级规则
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Level.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
def read_rule_role_name(): # 读取名称变化规则
global Rule_RoleName
col = 1
while True: # 名称变化
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_RoleName.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
})
col += 2
except:
break
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
def read_level_limit(): # 读取职位对应的级别限制
global Level_Limit, Promote_Level
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
Level_Limit = {}
Promote_Level = {}
for rule in Rule_Role:
for index, row in rule["rule"].iterrows():
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
def read_promote_verify(): # 读取晋升校验
global Promote_verify
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
def read_allowance(): # 读取津贴
global Allowance
col = 1
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Allowance.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Allowance = sorted(Allowance, key=lambda x: x['start'])
def load_people():
read_base_data()
read_promote()
logging.info("人员信息加载完成")
def load_rule():
read_rule_role()
read_rule_level()
read_rule_role_name()
read_level_limit()
read_promote_verify()
read_allowance()
logging.info("规则加载完成")
# 获取配置类函数
def role_salary(role:str, time):
for rule in Rule_Role:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
return tmp["salary"]
except:
if role == "":
logging.error("空职级")
else:
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
return 0
def level_salary(level:str, time):
for rule in Rule_Level:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
return tmp["salary"]
except:
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
return 0
def role_limit(role:str):
if role in Level_Limit.keys():
return Level_Limit[role]
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
return -1
# 填充类辅助函数
def fill_basic_info(ws, row):# 填充基本信息
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
ws.cell(row=3, column=2, value=row["姓名"])
ws.cell(row=3, column=4, value=row["性别"])
ws.cell(row=3, column=6, value=row["民族"])
ws.cell(row=5, column=2, value=row["现职"])
ws.cell(row=5, column=6, value=row["任职年限"])
ws.cell(row=6, column=2, value=row["原职"])
ws.cell(row=6, column=6, value=row["原职年限"])
ws.cell(row=7, column=2, value=row["学历"])
ws.cell(row=7, column=4, value=row["学龄"])
ws.cell(row=7, column=6, value=row["套改年限"])
ws.cell(row=7, column=7, value=row["职务工资"])
ws.cell(row=7, column=8, value=row["职务工资金额"])
ws.cell(row=7, column=9, value=row["级别工资"])
ws.cell(row=7, column=10, value=row["职务工资金额"])
ws.cell(row=17, column=1, value=row["个人评价结果"])
ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月"))
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间"))
ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月"))
ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间"))
def fill_prompt_info(ws, promote):# 填充晋升信息
for index, prow in promote.iterrows():
if index > P_LIMIT-1:
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
max_promote = max(max_promote, promote.shape[0])
break
ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间"))
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
ws.cell(row=P_START+index, column=3, value=""+prow["新职务"])
def fill_history_info(ws, History_pd):# 填充历史记录
for index, hrow in History_pd.iterrows(): # 打印
for col in range(1, 11): # 复制样式
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
try:
ws.cell(row=H_START+index, column=1, value=format_time(hrow["时间"],"历史时间"))
except:
logging.warning(f"历史时间格式错误:{hrow['时间']}")
ws.cell(row=H_START+index, column=2, value=hrow["职务"])
ws.cell(row=H_START+index, column=3, value=hrow["职务工资"])
ws.cell(row=H_START+index, column=4, value=hrow["级别档次"])
ws.cell(row=H_START+index, column=5, value=hrow["级别工资"])
ws.cell(row=H_START+index, column=6, value=hrow["工资合计"])
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
# ws.cell(row=H_START+index, column=8, value=index) # Debug
def fill_roster(): # 填充花名册
wb = load_workbook("模板/汇总名册.xlsx")
ws = wb.active
for index, row in BaseData.iterrows(): # 汇总
try:
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
for col in range(1,16):
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
ws.cell(row=6+index, column=1, value=index+1)
ws.cell(row=6+index, column=2, value=row["姓名"])
ws.cell(row=6+index, column=3, value=row["性别"])
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]]))
ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"])
ws.cell(row=6+index, column=8, value=row['工龄'])
ws.cell(row=6+index, column=9, value=row["学龄"])
ws.cell(row=6+index, column=10, value=row["工龄调减"])
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom"))
ws.cell(row=6+index, column=13, value=row["职务2"])
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
wb.save("汇总名册.xlsx") # 保存汇总
def main():
load_people()
load_rule()
# 创建一个空的DataFrame来存储所有历史记录
all_history = pd.DataFrame(columns=["身份证号码", "姓名", "时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
for index, row in BaseData.iterrows():
try:
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
BaseData.at[index, "Latest_Role"] = row["初始职务"]
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
wb = load_workbook("模板/个人台账.xlsx")
ws = wb.active
fill_basic_info(ws, row)# 填充基本信息
# 查找晋升信息
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
if not promote.empty:
promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True)
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
# 把原职务取出来
if promote.shape[0] > 1:
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
else:
BaseData.at[index, "职务2"] = row["初始职务"]
BaseData.at[index, "日期2"] = row["入职时间"]
promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True)
fill_prompt_info(ws, promote)# 填充晋升信息
# 根据规则匹配职级薪资
History_pd = pd.DataFrame(columns=["时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
# 添加入职记录
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级", ""]
for index, prow in promote.iterrows(): # 添加晋升记录
History_pd.loc[len(History_pd)] = [prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), prow["新职务"], "", "", "", "", "晋升", f"{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
try:
calctime=row["晋档起始"] + relativedelta(minute=1)
while True: # 添加晋档记录
calctime += relativedelta(years=row["晋档间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档", ""]
calctime=row["晋级起始"]
while True: # 添加晋级记录
calctime += relativedelta(years=row["晋级间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级", ""]
except:
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
for rule in Rule_Level: # 工资调标
if row["入职时间"] < rule["start"]:
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标", ""]
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
if History_pd.at[0,"时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})")
for index, hrow in History_pd.iterrows(): # 数据计算
# 调整职务职级
if index > 0 and hrow["职务"] == "":
History_pd.at[index, "职务"] = History_pd.iloc[index-1]["职务"]
for rule in Rule_RoleName: # 名称变化
if rule["start"] <= hrow["时间"] <= rule["end"]:
if History_pd.iloc[index]["职务"] in rule["rule"]["原名称"].values:
History_pd.at[index, "职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.iloc[index]["职务"]]["现名称"].values[0]
if index > 0 and hrow["级别档次"] == "":
jb, dc = split_level(History_pd.iloc[index-1]["级别档次"])
if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级":
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["职务"]):
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
else:
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
elif hrow["变动原因"] == "工资调标":
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
elif hrow["变动原因"] == "晋升":
role = History_pd.iloc[index]["职务"]
if role in Promote_Level.keys():
new_jb = jb + Promote_Level[role][0]
new_dc = dc + Promote_Level[role][1]
if pd.isna(new_jb) or pd.isna(new_dc):
print(Promote_Level[role][1])
raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})")
else:
new_jb = int(new_jb)
new_dc = int(new_dc)
if (History_pd.iloc[index-1]["职务"] in Promote_verify.iloc[:,0].values and
role in Promote_verify.iloc[:,1].values):
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.iloc[index-1]['职务']}]->[{role}]")
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
elif new_jb < role_limit(role):
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
elif new_jb < 1 or new_dc < 1:
raise Exception(f"级别档次小于0[{new_jb}]-[{new_dc}]")
else:
History_pd.at[index, "级别档次"] = f"{new_jb}-{new_dc}"
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
# 计算工资
History_pd.at[index, "职务工资"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"])
History_pd.at[index, "级别工资"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"])
History_pd.at[index, "工资合计"] = to_int(History_pd.iloc[index]["职务工资"]) + to_int(History_pd.iloc[index]["级别工资"])
fill_history_info(ws, History_pd)# 填充历史记录
# 将当前人员的历史记录添加到总表中
History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
all_history = pd.concat([all_history, History_pd], ignore_index=True)
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
# 保存所有历史记录到Excel文件
all_history["时间"] = all_history["时间"].apply(lambda x: format_time(x, "历史记录时间"))
all_history.to_excel("所有人员历史记录.xlsx", index=False)
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
fill_roster()
if max_promote > 0:
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
if max_history > 0:
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
if __name__ == "__main__":
main()