YansTool/main.py
mxr612 758047085c refactor: 优化数据处理逻辑和文档说明
- 移除首行跳过规则
- 更新使用说明文档,增加关于晋级晋档起始时间的运行规则说明
2025-06-02 16:28:18 +08:00

306 lines
14 KiB
Python

import pandas as pd
from openpyxl.utils import get_column_letter
from openpyxl import load_workbook
from datetime import datetime
from dateutil.relativedelta import relativedelta
import logging
from datetime import datetime
# 配置日志记录
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
P_LIMIT = 6 # 最大晋升次数
P_START = 10 # 晋升记录开始行
H_START = 15 + P_LIMIT # 历史记录开始行
# 自定义日期解析函数
def custom_date_parser(x):
try:
return datetime.strptime(x, '%Y-%m-%d')
except:
return x
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") #
Level_Limit = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols="A:B", skiprows=2, names=["limit","role"])
for index, row in BaseData.iterrows():
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "二档起始", "五档起始", "日期2"]:
BaseData.at[index, col] = custom_date_parser(row[col])
for index, row in Promote.iterrows():
for col in ["任职时间","工资执行时间"]:
Promote.at[index, col] = custom_date_parser(row[col])
logging.info("人员信息加载完成")
Rule_Role = []
col = 2
while True: # 职位规则
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Role.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
})
col += 2
except:
break
Rule_Level = []
col = 1
while True: # 职级规则
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Level.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Rule_RoleName = []
col = 1
while True: # 名称变化
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_RoleName.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
})
col += 2
except:
break
logging.info("规则加载完成")
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
nowtime = datetime.now()
def split_level(level:str):
try:
parts = level.split('-')
return (int(parts[0]), int(parts[1]))
except:
logging.warning(f"职级[{level}]格式错误")
return (0, 0)
def role_salary(role:str, time):
for rule in Rule_Role:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
return tmp["salary"]
except:
if role == "":
logging.error("空职级")
else:
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
return 0
def level_salary(level:str, time):
for rule in Rule_Level:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
return tmp["salary"]
except:
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
return 0
def role_limit(role:str):
try:
tmp = Level_Limit[Level_Limit["role"] == role].iloc[0]
return tmp["limit"]
except:
logging.warning(f"职位[{role}]不存在职级上限规则")
return -1
def format_time(dt,info):
try:
return dt.strftime("%Y.%m")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def to_int(x):
try:
return int(x)
except:
return 0
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
max_promote = 0
max_history = 0
def fill_basic_info(ws, row):# 填充基本信息
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
ws.cell(row=3, column=2, value=row["姓名"])
ws.cell(row=3, column=4, value=row["性别"])
ws.cell(row=3, column=6, value=row["民族"])
ws.cell(row=5, column=2, value=row["现职"])
ws.cell(row=5, column=6, value=row["任职年限"])
ws.cell(row=6, column=2, value=row["原职"])
ws.cell(row=6, column=6, value=row["原职年限"])
ws.cell(row=7, column=2, value=row["学历"])
ws.cell(row=7, column=4, value=row["学龄"])
ws.cell(row=7, column=6, value=row["套改年限"])
ws.cell(row=7, column=7, value=row["职务工资"])
ws.cell(row=7, column=8, value=row["职务工资金额"])
ws.cell(row=7, column=9, value=row["级别工资"])
ws.cell(row=7, column=10, value=row["职务工资金额"])
ws.cell(row=17, column=1, value=row["个人评价结果"])
ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月"))
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间"))
ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月"))
ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间"))
def fill_prompt_info(ws, promote):# 填充晋升信息
for index, prow in promote.iterrows():
if index > P_LIMIT-1:
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
max_promote = max(max_promote, promote.shape[0])
break
ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间"))
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
ws.cell(row=P_START+index, column=3, value=""+prow["新职务"])
def fill_history_info(ws, History_pd):# 填充历史记录
for index, hrow in History_pd.iterrows(): # 打印
for col in range(1, 11): # 复制样式
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
try:
ws.cell(row=H_START+index, column=1, value=format_time(hrow["时间"],"历史时间"))
except:
logging.warning(f"历史时间格式错误:{hrow['时间']}")
ws.cell(row=H_START+index, column=2, value=hrow["职务"])
ws.cell(row=H_START+index, column=3, value=hrow["工资额1"])
ws.cell(row=H_START+index, column=4, value=hrow["级别档次"])
ws.cell(row=H_START+index, column=5, value=hrow["工资额2"])
ws.cell(row=H_START+index, column=6, value=hrow["工资合计"])
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
# ws.cell(row=H_START+index, column=8, value=index) # Debug
BaseData["Latest_Role"] = None
BaseData["Latest_Prom"] = None
for index, row in BaseData.iterrows():
try:
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
BaseData.at[index, "Latest_Role"] = row["初始职务"]
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
wb = load_workbook("模板/个人台账.xlsx")
ws = wb.active
fill_basic_info(ws, row)# 填充基本信息
# 查找晋升信息
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
if not promote.empty:
promote = promote.sort_values(by="任职时间", ascending=False).reset_index(drop=True)
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
# 把原职务取出来
if promote.shape[0] > 1:
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
promote = promote.sort_values(by="任职时间").reset_index(drop=True)
fill_prompt_info(ws, promote)# 填充晋升信息
# 根据规则匹配职级薪资
History_pd = pd.DataFrame(columns=["时间", "职务", "工资额1", "级别档次", "工资额2", "工资合计", "变动原因"])
# 添加入职记录
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级"]
for index, prow in promote.iterrows(): # 添加晋升记录
History_pd.loc[len(History_pd)] = [prow["工资执行时间"], prow["新职务"], "", "", "", "", "晋升"]
try:
calctime=row["二档起始"] + relativedelta(minute=1)
while True: # 添加二级记录
calctime += relativedelta(years=2)
if calctime > nowtime:
break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档"]
calctime=row["五档起始"]
while True: # 添加五级记录
calctime += relativedelta(years=5)
if calctime > nowtime:
break
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级"]
except:
raise Exception(f"二、五档起始时间格式错误:{row['二档起始']}{row['五档起始']}")
for rule in Rule_Level: # 工资调标
if row["入职时间"] < rule["start"]:
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标"]
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
if History_pd.at[0,"时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他计算的时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})")
for index, hrow in History_pd.iterrows(): # 数据计算
# 调整职务职级
if index > 0 and hrow["职务"] == "":
History_pd.at[index, "职务"] = History_pd.iloc[index-1]["职务"]
for rule in Rule_RoleName: # 名称变化
if rule["start"] <= hrow["时间"] <= rule["end"]:
if History_pd.iloc[index]["职务"] in rule["rule"]["原名称"].values:
History_pd.at[index, "职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.iloc[index]["职务"]]["现名称"].values[0]
if index > 0 and hrow["级别档次"] == "":
jb, dc = split_level(History_pd.iloc[index-1]["级别档次"])
if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级":
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
elif hrow["变动原因"] == "工资调标":
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
elif hrow["变动原因"] == "晋升":
if role_limit(History_pd.iloc[index]["职务"]) == 99: # 99tag
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
elif jb == role_limit(History_pd.iloc[index]["职务"]):
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
else:
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
# 计算工资
History_pd.at[index, "工资额1"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"])
History_pd.at[index, "工资额2"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"])
History_pd.at[index, "工资合计"] = to_int(History_pd.iloc[index]["工资额1"]) + to_int(History_pd.iloc[index]["工资额2"])
fill_history_info(ws, History_pd)# 填充历史记录
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
wb = load_workbook("模板/汇总名册.xlsx")
ws = wb.active
for index, row in BaseData.iterrows(): # 汇总
try:
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
for col in range(1,16):
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
ws.cell(row=6+index, column=1, value=index+1)
ws.cell(row=6+index, column=2, value=row["姓名"])
ws.cell(row=6+index, column=3, value=row["性别"])
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
ws.cell(row=6+index, column=6, value=row["学历"])
ws.cell(row=6+index, column=7, value=nowtime.year-row["入职时间"].year+row["工龄调增"]-row["工龄调减"]+1)
ws.cell(row=6+index, column=8, value=nowtime.year-row["入职时间"].year)
ws.cell(row=6+index, column=9, value=row["工龄调增"])
ws.cell(row=6+index, column=10, value=row["工龄调减"])
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom"))
ws.cell(row=6+index, column=13, value=row["职务2"])
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
wb.save("汇总名册.xlsx") # 保存汇总
if max_promote > 0:
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
if max_history > 0:
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")