YansTool/main.py
mxr612 2eb841639e feat: 添加format_time_ymd函数并更新历史记录时间格式
新增format_time_ymd函数用于将时间格式化为"年.月.日"形式
修改main函数中使用format_time_ymd替代原format_time处理历史记录时间
2025-06-14 04:26:35 +08:00

476 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from openpyxl.utils import get_column_letter
from openpyxl import load_workbook
from datetime import datetime
from dateutil.relativedelta import relativedelta
import logging, os
# 配置日志记录
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 全局变量
## 常量
P_LIMIT = 6 # 最大晋升次数
P_START = 10 # 晋升记录开始行
H_START = 15 + P_LIMIT # 历史记录开始行
NOWTIME = datetime.now()
## 通过函数读取
BaseData = pd.DataFrame()
Promote = pd.DataFrame()
Rule_Role = []
Rule_Level = []
Rule_RoleName = []
Level_Limit = pd.DataFrame()
Promote_Level = pd.DataFrame()
Promote_verify = pd.DataFrame()
Allowance = []
## 统计量
max_promote = 0
max_history = 0
# 工具函数
def custom_date_parser(x):
try:
return datetime.strptime(x, '%Y-%m-%d')
except:
return x
def format_time(dt,info):
try:
return dt.strftime("%Y.%m")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def format_time_ymd(dt,info):
try:
return dt.strftime("%Y.%m.%d")
except:
logging.warning(f"[{info}]时间格式错误:{dt}")
return dt
def to_int(x):
try:
return int(x)
except:
return 0
def fallback(x):
for i in x:
if pd.notna(i) and i != '':
return i
return ''
def split_level(level:str):
try:
parts = level.split('-')
return (int(parts[0]), int(parts[1]))
except:
raise Exception(f"职级[{level}]格式错误")
def calculate_seniority(row, year):
return year - row["参加工作时间"].year + row["工龄调增"] - row["工龄调减"] + 1
# 读取信息
def read_base_data(): # 读取员工数据
global BaseData
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
BaseData[col] = BaseData[col].apply(custom_date_parser)
for col in ["晋档起始", "晋级起始"]:
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
BaseData["Latest_Role"] = None
BaseData["Latest_Prom"] = None
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
def read_promote(): # 读取晋升记录
global Promote
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
for col in ["任职时间","工资执行时间"]:
Promote[col] = Promote[col].apply(custom_date_parser)
def read_rule_role(): # 读取职位规则
global Rule_Role
col = 4
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Role.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
})
col += 2
except:
break
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
def read_rule_level(): # 读取职级规则
global Rule_Level
col = 1
while True: # 职级规则
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_Level.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
def read_rule_role_name(): # 读取名称变化规则
global Rule_RoleName
col = 1
while True: # 名称变化
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Rule_RoleName.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
})
col += 2
except:
break
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
def read_level_limit(): # 读取职位对应的级别限制
global Level_Limit, Promote_Level
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
Level_Limit = {}
Promote_Level = {}
for rule in Rule_Role:
for index, row in rule["rule"].iterrows():
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
def read_promote_verify(): # 读取晋升校验
global Promote_verify
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
def read_allowance(): # 读取津贴
global Allowance
col = 1
while True:
try:
rule = pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
Allowance.append({
"start":rule.iloc[0,1],
"end":rule.iloc[1,1],
"rule":pd.read_excel("原数据.xlsx", sheet_name="津贴规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
})
col += 2
except:
break
Allowance = sorted(Allowance, key=lambda x: x['start'])
def load_people():
read_base_data()
read_promote()
logging.info("人员信息加载完成")
def load_rule():
read_rule_role()
read_rule_level()
read_rule_role_name()
read_level_limit()
read_promote_verify()
read_allowance()
logging.info("规则加载完成")
# 获取配置类函数
def role_salary(role:str, time):
for rule in Rule_Role:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
return tmp["salary"]
except:
if role == "":
logging.error("空职级")
else:
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
logging.warning(f"时间[{time}]时不存在职位工资规则")
return 0
def level_salary(level:str, time):
for rule in Rule_Level:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
return tmp["salary"]
except:
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
logging.warning(f"时间[{time}]时不存在职级工资规则")
return 0
def role_limit(role:str):
if role in Level_Limit.keys():
return Level_Limit[role]
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
return -1
def allowance(role:str, level:int, time):
for rule in Allowance:
if rule["start"] <= time <= rule["end"]:
try:
tmp = rule["rule"][rule["rule"]["level"] == f"{role}-{level}"].iloc[0]
return tmp["salary"]
except:
logging.warning(f"组合[{role}-{level}]在[{time}]时不存在工资规则")
logging.warning(f"时间[{time}]时不存在津贴规则")
raise 0
# 填充类辅助函数
def fill_basic_info(ws, row):# 填充基本信息
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
ws.cell(row=3, column=2, value=row["姓名"])
ws.cell(row=3, column=4, value=row["性别"])
ws.cell(row=3, column=6, value=row["民族"])
ws.cell(row=5, column=2, value=row["现职"])
ws.cell(row=5, column=6, value=row["任职年限"])
ws.cell(row=6, column=2, value=row["原职"])
ws.cell(row=6, column=6, value=row["原职年限"])
ws.cell(row=7, column=2, value=row["学历"])
ws.cell(row=7, column=4, value=row["学龄"])
ws.cell(row=7, column=6, value=row["套改年限"])
ws.cell(row=7, column=7, value=row["职务工资"])
ws.cell(row=7, column=8, value=row["职务工资金额"])
ws.cell(row=7, column=9, value=row["级别工资"])
ws.cell(row=7, column=10, value=row["职务工资金额"])
ws.cell(row=17, column=1, value=row["个人评价结果"])
ws.cell(row=3, column=8, value=format_time(row["出生年月"],"出生年月"))
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"],"参加工作时间"))
ws.cell(row=5, column=4, value=format_time(row["任职年月"],"任职年月"))
ws.cell(row=6, column=4, value=format_time(row["原职时间"],"原职时间"))
def fill_prompt_info(ws, promote):# 填充晋升信息
for index, prow in promote.iterrows():
if index > P_LIMIT-1:
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
max_promote = max(max_promote, promote.shape[0])
break
ws.cell(row=P_START+index, column=1, value=format_time(prow["任职时间"],"晋升时间"))
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
ws.cell(row=P_START+index, column=3, value=""+prow["新职务"])
def fill_history_info(ws, History_pd):# 填充历史记录
for index, hrow in History_pd.iterrows(): # 打印
for col in range(1, 11): # 复制样式
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
try:
ws.cell(row=H_START+index, column=1, value=format_time(hrow["变动后时间"],"历史时间"))
except:
logging.warning(f"历史时间格式错误:{hrow['时间']}")
ws.cell(row=H_START+index, column=2, value=hrow["变动后职务"])
ws.cell(row=H_START+index, column=3, value=hrow["变动后职务工资"])
ws.cell(row=H_START+index, column=4, value=hrow["变动后级别档次"])
ws.cell(row=H_START+index, column=5, value=hrow["变动后级别工资"])
ws.cell(row=H_START+index, column=6, value=hrow["变动后工资合计"])
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
# ws.cell(row=H_START+index, column=8, value=index) # Debug
def fill_roster(): # 填充花名册
wb = load_workbook("模板/汇总名册.xlsx")
ws = wb.active
for index, row in BaseData.iterrows(): # 汇总
try:
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
for col in range(1,16):
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
ws.cell(row=6+index, column=1, value=index+1)
ws.cell(row=6+index, column=2, value=row["姓名"])
ws.cell(row=6+index, column=3, value=row["性别"])
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]]))
ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"])
ws.cell(row=6+index, column=8, value=row['工龄'])
ws.cell(row=6+index, column=9, value=row["学龄"])
ws.cell(row=6+index, column=10, value=row["工龄调减"])
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"], "Latest_Prom"))
ws.cell(row=6+index, column=13, value=row["职务2"])
ws.cell(row=6+index, column=14, value=format_time(row["日期2"], "日期2"))
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
wb.save("汇总名册.xlsx") # 保存汇总
def main():
load_people()
load_rule()
# 创建一个空的DataFrame来存储所有历史记录
all_history = pd.DataFrame(columns=[
"身份证号码", "姓名", "工龄","变动原因", "晋升备注",
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资",
"变动后时间", "变动后职务", "变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资",
"五年1级年份", "两年1档年份"])
for index, row in BaseData.iterrows():
try:
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
BaseData.at[index, "Latest_Role"] = row["初始职务"]
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
wb = load_workbook("模板/个人台账.xlsx")
ws = wb.active
fill_basic_info(ws, row)# 填充基本信息
# 查找晋升信息
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
if not promote.empty:
promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True)
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
# 把原职务取出来
if promote.shape[0] > 1:
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
else:
BaseData.at[index, "职务2"] = row["初始职务"]
BaseData.at[index, "日期2"] = row["入职时间"]
promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True)
fill_prompt_info(ws, promote)# 填充晋升信息
# 根据规则匹配职级薪资
History_pd = pd.DataFrame(columns=[
"身份证号码", "姓名", # 统一填入
"变动后时间", "变动后职务", "变动原因", "晋升备注", # 直接填入
"工龄", "五年1级年份", "两年1档年份", # 简单计算更新
"变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计", # 排序后更新
"变动后级别档次", "变动后职务工资", "变动后级别工资", "变动后津贴工资", "变动后工资合计"]) # 复杂计算更新
# 添加入职记录
History_pd.loc[len(History_pd), ["变动后时间","变动后职务","变动原因","变动后级别档次"]] = [
row["入职时间"],row["初始职务"],"套改/定级",row["入职时的初始级别"]]
for index, prow in promote.iterrows(): # 添加晋升记录
History_pd.loc[len(History_pd),["变动后时间","变动后职务","变动原因","晋升备注"]] = [
prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day),
prow["新职务"],"晋升",f"{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
try:
# 添加晋档记录
calctime=row["晋档起始"] + relativedelta(minute=1)
while True:
calctime += relativedelta(years=row["晋档间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因","五年1级年份","两年1档年份"]] = [
calctime,"两年晋档",calctime.year-row["晋级起始"].year,calctime.year-row["晋档起始"].year]
calctime=row["晋级起始"]
# 添加晋级记录
while True:
calctime += relativedelta(years=row["晋级间隔"])
if calctime > NOWTIME:
break
History_pd.loc[len(History_pd),["变动后时间","变动原因","五年1级年份","两年1档年份"]] = [
calctime,"五年晋级",calctime.year-row["晋级起始"].year,calctime.year-row["晋档起始"].year]
except:
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
# 工资调标
for rule in Rule_Level:
if row["入职时间"] < rule["start"]:
History_pd.loc[len(History_pd),["变动后时间","变动原因"]] = [rule["start"], "工资调标"]
History_pd["身份证号码"] = row["身份证号码"]
History_pd["姓名"] = row["姓名"]
History_pd["工龄"] = History_pd.apply(lambda x: calculate_seniority(row, x["变动后时间"]), axis=1)
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
if History_pd.at[0,"时间"] != row["入职时间"]:
raise Exception(f"入职时间晚于其他时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})")
# 复杂数据计算
for index, hrow in History_pd.iterrows():
# 继承上一条复杂计算数据
if index > 0:
History_pd.loc[index,["变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计"]] = History_pd.loc[index - 1,["变动前时间", "变动前职务", "变动前级别档次", "变动前职务工资", "变动前级别工资", "变动前津贴工资", "变动前工资合计"]]
# 继承名称
if hrow["变动后职务"] == "":
History_pd.at[index,"变动后职务"] = History_pd.at[index,"变动前职务"]
# 名称变化
for rule in Rule_RoleName:
if rule["start"] <= hrow["变动后时间"] <= rule["end"]:
if History_pd.at[index,"变动后职务"] in rule["rule"]["原名称"].values:
History_pd.at[index,"变动后职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.at[index,"变动后职务"]]["现名称"].values[0]
# 级别档次
if index > 0:
jb, dc = split_level(History_pd.at[index,"变动前级别档次"])
if hrow["变动原因"] == "两年晋档":
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif hrow["变动原因"] == "五年晋级":
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["职务"]):
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
else:
History_pd.at[index, "变动后级别档次"] = f"{jb-1}-{dc-1}"
elif hrow["变动原因"] == "工资调标":
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif hrow["变动原因"] == "晋升":
role = History_pd.iloc[index]["变动后职务"]
if role in Promote_Level.keys():
new_jb = jb + Promote_Level[role][0]
new_dc = dc + Promote_Level[role][1]
if pd.isna(new_jb) or pd.isna(new_dc):
print(Promote_Level[role][1])
raise Exception(f"级别档次计算出现NaN值[{new_jb}]-[{new_dc}]({role})")
else:
new_jb = int(new_jb)
new_dc = int(new_dc)
if (History_pd.at[index,"变动后职务"] in Promote_verify.iloc[:,0].values and
role in Promote_verify.iloc[:,1].values):
logging.info(f"[{row['身份证号码']}]命中晋升校验规则[{History_pd.at[index,'变动前职务']}]->[{role}]")
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc}"
elif new_jb < role_limit(role):
History_pd.at[index, "变动后级别档次"] = f"{jb}-{dc+1}"
elif new_jb < 1 or new_dc < 1:
raise Exception(f"级别档次小于0[{new_jb}]-[{new_dc}]")
else:
History_pd.at[index, "变动后级别档次"] = f"{new_jb}-{new_dc}"
else:
logging.warning(f"职位[{role}]不存在职级上限规则")
else:
History_pd.at[index, "变动后级别档次"] = History_pd.at[index, "变动前级别档次"]
# 计算工资
History_pd.at[index, "变动后职务工资"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"])
History_pd.at[index, "变动后级别工资"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"])
History_pd.at[index, "变动后工资合计"] = to_int(History_pd.iloc[index]["职务工资"]) + to_int(History_pd.iloc[index]["级别工资"])
fill_history_info(ws, History_pd)# 填充历史记录
# 将当前人员的历史记录添加到总表中
all_history = pd.concat([all_history, History_pd], ignore_index=True)
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
except Exception as e:
logging.error(f"{row['身份证号码']}:{e}")
# 保存所有历史记录到Excel文件
all_history["时间"] = all_history["时间"].apply(lambda x: format_time_ymd(x, "历史记录时间"))
all_history.to_excel("所有人员历史记录.xlsx", index=False)
logging.info("所有人员历史记录已保存到'所有人员历史记录.xlsx'")
fill_roster()
if max_promote > 0:
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
if max_history > 0:
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
if __name__ == "__main__":
main()