312 lines
14 KiB
Python
312 lines
14 KiB
Python
import pandas as pd
|
|
from openpyxl.utils import get_column_letter
|
|
from openpyxl import load_workbook
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
# 配置日志记录
|
|
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
P_LIMIT = 6 # 最大晋升次数
|
|
|
|
P_START = 10 # 晋升记录开始行
|
|
H_START = 15 + P_LIMIT # 历史记录开始行
|
|
|
|
# 自定义日期解析函数
|
|
def custom_date_parser(x):
|
|
try:
|
|
return datetime.strptime(x, '%Y-%m-%d')
|
|
except:
|
|
return x
|
|
|
|
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
|
|
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") #
|
|
Level_Limit = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols="A:B", skiprows=2, names=["limit","role"])
|
|
|
|
BaseData = BaseData.drop(0)
|
|
BaseData = BaseData.reset_index(drop=True)
|
|
Promote = Promote.drop(0)
|
|
Promote = Promote.reset_index(drop=True)
|
|
|
|
for index, row in BaseData.iterrows():
|
|
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "二档起始", "五档起始", "日期2"]:
|
|
BaseData.at[index, col] = custom_date_parser(row[col])
|
|
|
|
for index, row in Promote.iterrows():
|
|
for col in ["任职时间","工资执行时间"]:
|
|
Promote.at[index, col] = custom_date_parser(row[col])
|
|
|
|
logging.info("人员信息加载完成")
|
|
|
|
Rule_Role = []
|
|
col = 2
|
|
while True: # 职位规则
|
|
try:
|
|
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
|
Rule_Role.append({
|
|
"start":rule.iloc[0,1],
|
|
"end":rule.iloc[1,1],
|
|
"rule":pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["role","salary"])
|
|
})
|
|
col += 2
|
|
except:
|
|
break
|
|
Rule_Level = []
|
|
col = 1
|
|
while True: # 职级规则
|
|
try:
|
|
rule = pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
|
Rule_Level.append({
|
|
"start":rule.iloc[0,1],
|
|
"end":rule.iloc[1,1],
|
|
"rule":pd.read_excel("原数据.xlsx", sheet_name="职级规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2, names=["level","salary"])
|
|
})
|
|
col += 2
|
|
except:
|
|
break
|
|
Rule_RoleName = []
|
|
col = 1
|
|
while True: # 名称变化
|
|
try:
|
|
rule = pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
|
Rule_RoleName.append({
|
|
"start":rule.iloc[0,1],
|
|
"end":rule.iloc[1,1],
|
|
"rule":pd.read_excel("原数据.xlsx", sheet_name="名称变化",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}",skiprows=2)
|
|
})
|
|
col += 2
|
|
except:
|
|
break
|
|
|
|
logging.info("规则加载完成")
|
|
|
|
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
|
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
|
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
|
|
|
nowtime = datetime.now()
|
|
|
|
def split_level(level:str):
|
|
try:
|
|
parts = level.split('-')
|
|
return (int(parts[0]), int(parts[1]))
|
|
except:
|
|
logging.warning(f"职级[{level}]格式错误")
|
|
return (0, 0)
|
|
|
|
def role_salary(role:str, time):
|
|
for rule in Rule_Role:
|
|
if rule["start"] <= time <= rule["end"]:
|
|
try:
|
|
tmp = rule["rule"][rule["rule"]["role"] == role].iloc[0]
|
|
return tmp["salary"]
|
|
except:
|
|
if role == "":
|
|
logging.error("空职级")
|
|
else:
|
|
logging.warning(f"职位[{role}]在[{time}]时不存在工资规则")
|
|
return 0
|
|
|
|
def level_salary(level:str, time):
|
|
for rule in Rule_Level:
|
|
if rule["start"] <= time <= rule["end"]:
|
|
try:
|
|
tmp = rule["rule"][rule["rule"]["level"] == level].iloc[0]
|
|
return tmp["salary"]
|
|
except:
|
|
logging.warning(f"职级[{level}]在[{time}]时不存在工资规则")
|
|
return 0
|
|
|
|
def role_limit(role:str):
|
|
try:
|
|
tmp = Level_Limit[Level_Limit["role"] == role].iloc[0]
|
|
return tmp["limit"]
|
|
except:
|
|
logging.warning(f"职位[{role}]不存在职级上限规则")
|
|
return -1
|
|
|
|
def format_time(dt):
|
|
try:
|
|
return dt.strftime("%Y.%m")
|
|
except:
|
|
return dt
|
|
|
|
def to_int(x):
|
|
try:
|
|
return int(x)
|
|
except:
|
|
return 0
|
|
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
|
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
|
|
|
max_promote = 0
|
|
max_history = 0
|
|
|
|
def fill_basic_info(ws, row):# 填充基本信息
|
|
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
|
|
ws.cell(row=3, column=2, value=row["姓名"])
|
|
ws.cell(row=3, column=4, value=row["性别"])
|
|
ws.cell(row=3, column=6, value=row["民族"])
|
|
ws.cell(row=5, column=2, value=row["现职"])
|
|
ws.cell(row=5, column=6, value=row["任职年限"])
|
|
ws.cell(row=6, column=2, value=row["原职"])
|
|
ws.cell(row=6, column=6, value=row["原职年限"])
|
|
ws.cell(row=7, column=2, value=row["学历"])
|
|
ws.cell(row=7, column=4, value=row["学龄"])
|
|
ws.cell(row=7, column=6, value=row["套改年限"])
|
|
ws.cell(row=7, column=7, value=row["职务工资"])
|
|
ws.cell(row=7, column=8, value=row["职务工资金额"])
|
|
ws.cell(row=7, column=9, value=row["级别工资"])
|
|
ws.cell(row=7, column=10, value=row["职务工资金额"])
|
|
ws.cell(row=17, column=1, value=row["个人评价结果"])
|
|
ws.cell(row=3, column=8, value=format_time(row["出生年月"]))
|
|
ws.cell(row=3, column=10, value=format_time(row["参加工作时间"]))
|
|
ws.cell(row=5, column=4, value=format_time(row["任职年月"]))
|
|
ws.cell(row=6, column=4, value=format_time(row["原职时间"]))
|
|
|
|
def fill_prompt_info(ws, promote):# 填充晋升信息
|
|
for index, prow in promote.iterrows():
|
|
if index > P_LIMIT-1:
|
|
logging.warning(f"超过[{P_LIMIT}]条晋升信息,共[{promote.shape[0]}]条。")
|
|
max_promote = max(max_promote, promote.shape[0])
|
|
break
|
|
try:
|
|
ws.cell(row=P_START+index, column=1, value=prow["任职时间"])
|
|
except:
|
|
logging.warning(f"晋升时间格式错误:{prow['任职时间']}")
|
|
ws.cell(row=P_START+index, column=2, value=prow["变动批注"])
|
|
ws.cell(row=P_START+index, column=3, value="任"+prow["新职务"])
|
|
|
|
def fill_history_info(ws, History_pd):# 填充历史记录
|
|
for index, hrow in History_pd.iterrows(): # 打印
|
|
for col in range(1, 11): # 复制样式
|
|
ws.cell(row=H_START+index, column=col)._style = ws.cell(row=H_START, column=col)._style
|
|
try:
|
|
ws.cell(row=H_START+index, column=1, value=hrow["时间"].strftime("%Y.%m"))
|
|
except:
|
|
logging.warning(f"历史时间格式错误:{hrow['时间']}")
|
|
ws.cell(row=H_START+index, column=2, value=hrow["职务"])
|
|
ws.cell(row=H_START+index, column=3, value=hrow["工资额1"])
|
|
ws.cell(row=H_START+index, column=4, value=hrow["级别档次"])
|
|
ws.cell(row=H_START+index, column=5, value=hrow["工资额2"])
|
|
ws.cell(row=H_START+index, column=6, value=hrow["工资合计"])
|
|
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
|
|
# ws.cell(row=H_START+index, column=8, value=index) # Debug
|
|
|
|
BaseData["Latest_Role"] = None
|
|
BaseData["Latest_Prom"] = None
|
|
|
|
for index, row in BaseData.iterrows():
|
|
try:
|
|
logging.info(f"台账:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
|
BaseData.at[index, "Latest_Role"] = row["初始职务"]
|
|
BaseData.at[index, "Latest_Prom"] = row["入职时间"]
|
|
wb = load_workbook("模板/个人台账.xlsx")
|
|
ws = wb.active
|
|
fill_basic_info(ws, row)# 填充基本信息
|
|
|
|
# 查找晋升信息
|
|
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
|
|
if not promote.empty:
|
|
promote = promote.sort_values(by="任职时间", ascending=False).reset_index(drop=True)
|
|
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
|
|
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
|
|
if promote.shape[0] > 1:
|
|
BaseData.at[index, "职务2"] = promote.iloc[1]["新职务"]
|
|
BaseData.at[index, "日期2"] = promote.iloc[1]["任职时间"]
|
|
|
|
promote = promote.sort_values(by="任职时间").reset_index(drop=True)
|
|
fill_prompt_info(ws, promote)# 填充晋升信息
|
|
|
|
# 根据规则匹配职级薪资
|
|
History_pd = pd.DataFrame(columns=["时间", "职务", "工资额1", "级别档次", "工资额2", "工资合计", "变动原因"])
|
|
# 添加入职记录
|
|
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级"]
|
|
for index, prow in promote.iterrows(): # 添加晋升记录
|
|
History_pd.loc[len(History_pd)] = [prow["工资执行时间"], prow["新职务"], "", "", "", "", "晋升"]
|
|
try:
|
|
calctime=row["二档起始"] + relativedelta(minute=1)
|
|
while True: # 添加二级记录
|
|
calctime += relativedelta(years=2)
|
|
if calctime > nowtime:
|
|
break
|
|
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档"]
|
|
calctime=row["五档起始"]
|
|
while True: # 添加五级记录
|
|
calctime += relativedelta(years=5)
|
|
if calctime > nowtime:
|
|
break
|
|
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级"]
|
|
except:
|
|
raise Exception(f"二、五档起始时间格式错误:{row['二档起始']}或{row['五档起始']}")
|
|
for rule in Rule_Level: # 工资调标
|
|
if row["入职时间"] < rule["start"]:
|
|
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标"]
|
|
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
|
|
|
|
if History_pd.at[0,"时间"] != row["入职时间"]:
|
|
raise Exception(f"入职时间晚于其他计算的时间:{row['入职时间']} < {History_pd.at[0,'时间']} ({History_pd.at[0,'变动原因']})")
|
|
for index, hrow in History_pd.iterrows(): # 数据计算
|
|
# 调整职务职级
|
|
if index > 0 and hrow["职务"] == "":
|
|
History_pd.at[index, "职务"] = History_pd.iloc[index-1]["职务"]
|
|
for rule in Rule_RoleName: # 名称变化
|
|
if rule["start"] <= hrow["时间"] <= rule["end"]:
|
|
if History_pd.iloc[index]["职务"] in rule["rule"]["原名称"].values:
|
|
History_pd.at[index, "职务"] = rule["rule"][rule["rule"]["原名称"] == History_pd.iloc[index]["职务"]]["现名称"].values[0]
|
|
if index > 0 and hrow["级别档次"] == "":
|
|
jb, dc = split_level(History_pd.iloc[index-1]["级别档次"])
|
|
if hrow["变动原因"] == "两年晋档":
|
|
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
|
|
elif hrow["变动原因"] == "五年晋级":
|
|
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
|
|
elif hrow["变动原因"] == "工资调标":
|
|
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
|
|
elif hrow["变动原因"] == "晋升":
|
|
if role_limit(History_pd.iloc[index]["职务"]) == 99: # 99tag
|
|
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
|
|
elif jb == role_limit(History_pd.iloc[index]["职务"]):
|
|
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
|
|
else:
|
|
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
|
|
# 计算工资
|
|
History_pd.at[index, "工资额1"] = role_salary(History_pd.iloc[index]["职务"], hrow["时间"])
|
|
History_pd.at[index, "工资额2"] = level_salary(History_pd.iloc[index]["级别档次"], hrow["时间"])
|
|
History_pd.at[index, "工资合计"] = to_int(History_pd.iloc[index]["工资额1"]) + to_int(History_pd.iloc[index]["工资额2"])
|
|
|
|
fill_history_info(ws, History_pd)# 填充历史记录
|
|
wb.save(f"个人台账/{row['身份证号码']}_{row['姓名']}.xlsx")
|
|
except Exception as e:
|
|
logging.error(f"{row['身份证号码']}:{e}")
|
|
|
|
wb = load_workbook("模板/汇总名册.xlsx")
|
|
ws = wb.active
|
|
for index, row in BaseData.iterrows(): # 汇总
|
|
try:
|
|
logging.info(f"汇总:第[{index+1}]共[{BaseData.shape[0]}]现在是[{row['身份证号码']}]")
|
|
for col in range(1,16):
|
|
ws.cell(row=6+index, column=col)._style = ws.cell(row=6, column=col)._style
|
|
ws.cell(row=6+index, column=1, value=index+1)
|
|
ws.cell(row=6+index, column=2, value=row["姓名"])
|
|
ws.cell(row=6+index, column=3, value=row["性别"])
|
|
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"]))
|
|
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"]))
|
|
ws.cell(row=6+index, column=6, value=row["学历"])
|
|
ws.cell(row=6+index, column=7, value=relativedelta(nowtime, row["入职时间"]).years+row["工龄调增"]-row["工龄调减"]+1)
|
|
ws.cell(row=6+index, column=8, value=relativedelta(nowtime, row["入职时间"]).years)
|
|
ws.cell(row=6+index, column=9, value=row["工龄调增"])
|
|
ws.cell(row=6+index, column=10, value=row["工龄调减"])
|
|
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
|
|
ws.cell(row=6+index, column=12, value=format_time(row["Latest_Prom"]))
|
|
ws.cell(row=6+index, column=13, value=row["职务2"])
|
|
ws.cell(row=6+index, column=14, value=format_time(row["日期2"]))
|
|
except Exception as e:
|
|
logging.error(f"{row['身份证号码']}:{e}")
|
|
wb.save("汇总名册.xlsx") # 保存汇总
|
|
|
|
if max_promote > 0:
|
|
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
|
|
if max_history > 0:
|
|
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。") |