Compare commits
10 Commits
af32927f5b
...
f77bf1d096
Author | SHA1 | Date | |
---|---|---|---|
f77bf1d096 | |||
bef29011c3 | |||
915bd806a7 | |||
4db6f6c10f | |||
5cdb6e3d02 | |||
8d5931db63 | |||
f00cd31e46 | |||
4def3c7dc6 | |||
370131c773 | |||
f1ba0e875f |
188
main.py
188
main.py
@ -9,34 +9,91 @@ from datetime import datetime
|
||||
# 配置日志记录
|
||||
logging.basicConfig(filename='log.txt', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
# 常量
|
||||
|
||||
P_LIMIT = 6 # 最大晋升次数
|
||||
|
||||
P_START = 10 # 晋升记录开始行
|
||||
H_START = 15 + P_LIMIT # 历史记录开始行
|
||||
|
||||
# 自定义日期解析函数
|
||||
NOWTIME = datetime.now()
|
||||
|
||||
# 全局变量
|
||||
|
||||
## 通过函数读取
|
||||
BaseData = pd.DataFrame()
|
||||
Promote = pd.DataFrame()
|
||||
Rule_Role = []
|
||||
Rule_Level = []
|
||||
Rule_RoleName = []
|
||||
Level_Limit = pd.DataFrame()
|
||||
Promote_Level = pd.DataFrame()
|
||||
Promote_verify = pd.DataFrame()
|
||||
|
||||
## 统计量
|
||||
max_promote = 0
|
||||
max_history = 0
|
||||
|
||||
# 工具函数
|
||||
|
||||
def custom_date_parser(x):
|
||||
try:
|
||||
return datetime.strptime(x, '%Y-%m-%d')
|
||||
except:
|
||||
return x
|
||||
|
||||
def format_time(dt,info):
|
||||
try:
|
||||
return dt.strftime("%Y.%m")
|
||||
except:
|
||||
logging.warning(f"[{info}]时间格式错误:{dt}")
|
||||
return dt
|
||||
|
||||
def to_int(x):
|
||||
try:
|
||||
return int(x)
|
||||
except:
|
||||
return 0
|
||||
|
||||
def fallback(x):
|
||||
for i in x:
|
||||
if pd.notna(i) and i != '':
|
||||
return i
|
||||
return ''
|
||||
|
||||
def split_level(level:str):
|
||||
try:
|
||||
parts = level.split('-')
|
||||
return (int(parts[0]), int(parts[1]))
|
||||
except:
|
||||
raise Exception(f"职级[{level}]格式错误")
|
||||
|
||||
# 读取信息
|
||||
|
||||
def read_base_data(): # 读取员工数据
|
||||
global BaseData
|
||||
BaseData = pd.read_excel("原数据.xlsx", sheet_name="入职信息")
|
||||
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动") #
|
||||
|
||||
for index, row in BaseData.iterrows():
|
||||
for col in ["出生年月","任职年月","原职时间","参加工作时间","入职时间", "晋档起始", "晋级起始", "日期2"]:
|
||||
BaseData.at[index, col] = custom_date_parser(row[col])
|
||||
BaseData[col] = BaseData[col].apply(custom_date_parser)
|
||||
for col in ["晋档起始", "晋级起始"]:
|
||||
BaseData[col] = BaseData[col].apply(lambda x: datetime(x.year, 1, 1) if isinstance(x, datetime) else x)
|
||||
BaseData["Latest_Role"] = None
|
||||
BaseData["Latest_Prom"] = None
|
||||
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
||||
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
||||
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
|
||||
BaseData["工龄"] = BaseData.apply(lambda row: NOWTIME.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1, axis=1)
|
||||
|
||||
for index, row in Promote.iterrows():
|
||||
def read_promote(): # 读取晋升记录
|
||||
global Promote
|
||||
Promote = pd.read_excel("原数据.xlsx", sheet_name="职务变动")
|
||||
for col in ["任职时间","工资执行时间"]:
|
||||
Promote.at[index, col] = custom_date_parser(row[col])
|
||||
Promote[col] = Promote[col].apply(custom_date_parser)
|
||||
|
||||
logging.info("人员信息加载完成")
|
||||
|
||||
Rule_Role = []
|
||||
def read_rule_role(): # 读取职位规则
|
||||
global Rule_Role
|
||||
col = 4
|
||||
while True: # 职位规则
|
||||
while True:
|
||||
try:
|
||||
rule = pd.read_excel("原数据.xlsx", sheet_name="职位规则",usecols=f"{get_column_letter(col)}:{get_column_letter(col+1)}", header=None)
|
||||
Rule_Role.append({
|
||||
@ -47,7 +104,10 @@ while True: # 职位规则
|
||||
col += 2
|
||||
except:
|
||||
break
|
||||
Rule_Level = []
|
||||
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
||||
|
||||
def read_rule_level(): # 读取职级规则
|
||||
global Rule_Level
|
||||
col = 1
|
||||
while True: # 职级规则
|
||||
try:
|
||||
@ -60,7 +120,10 @@ while True: # 职级规则
|
||||
col += 2
|
||||
except:
|
||||
break
|
||||
Rule_RoleName = []
|
||||
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
||||
|
||||
def read_rule_role_name(): # 读取名称变化规则
|
||||
global Rule_RoleName
|
||||
col = 1
|
||||
while True: # 名称变化
|
||||
try:
|
||||
@ -73,8 +136,10 @@ while True: # 名称变化
|
||||
col += 2
|
||||
except:
|
||||
break
|
||||
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
||||
|
||||
# 读取职位对应的级别限制
|
||||
def read_level_limit(): # 读取职位对应的级别限制
|
||||
global Level_Limit, Promote_Level
|
||||
Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="A:A", skiprows=2, names=["limit"])
|
||||
Promote_Level_tmp = pd.read_excel("原数据.xlsx", sheet_name="职位规则", usecols="B:C", skiprows=2, names=["级别","档次"])
|
||||
Level_Limit = {}
|
||||
@ -84,24 +149,24 @@ for rule in Rule_Role:
|
||||
Level_Limit[row["role"]] = Level_Limit_tmp.iloc[index]["limit"]
|
||||
Promote_Level[row["role"]] = (Promote_Level_tmp.iloc[index]["级别"], Promote_Level_tmp.iloc[index]["档次"])
|
||||
|
||||
# 晋升校验
|
||||
Promote_verify = Level_Limit_tmp = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
|
||||
def read_promote_verify(): # 读取晋升校验
|
||||
global Promote_verify
|
||||
Promote_verify = pd.read_excel("原数据.xlsx", sheet_name="晋升校验", usecols="A:B")
|
||||
|
||||
def load_people():
|
||||
read_base_data()
|
||||
read_promote()
|
||||
logging.info("人员信息加载完成")
|
||||
|
||||
def load_rule():
|
||||
read_rule_role()
|
||||
read_rule_level()
|
||||
read_rule_role_name()
|
||||
read_level_limit()
|
||||
read_promote_verify()
|
||||
logging.info("规则加载完成")
|
||||
|
||||
Rule_Role = sorted(Rule_Role, key=lambda x: x['start'])
|
||||
Rule_Level = sorted(Rule_Level, key=lambda x: x['start'])
|
||||
Rule_RoleName = sorted(Rule_RoleName, key=lambda x: x['start'])
|
||||
|
||||
nowtime = datetime.now()
|
||||
|
||||
def split_level(level:str):
|
||||
try:
|
||||
parts = level.split('-')
|
||||
return (int(parts[0]), int(parts[1]))
|
||||
except:
|
||||
raise Exception(f"职级[{level}]格式错误")
|
||||
return (0, 0)
|
||||
# 获取配置类函数
|
||||
|
||||
def role_salary(role:str, time):
|
||||
for rule in Rule_Role:
|
||||
@ -133,30 +198,7 @@ def role_limit(role:str):
|
||||
logging.warning(f"职位[{role}]不存在职级上限规则")
|
||||
return -1
|
||||
|
||||
def format_time(dt,info):
|
||||
try:
|
||||
return dt.strftime("%Y.%m")
|
||||
except:
|
||||
logging.warning(f"[{info}]时间格式错误:{dt}")
|
||||
return dt
|
||||
|
||||
def to_int(x):
|
||||
try:
|
||||
return int(x)
|
||||
except:
|
||||
return 0
|
||||
BaseData["工龄调增"] = BaseData["工龄调增"].apply(to_int)
|
||||
BaseData["工龄调减"] = BaseData["工龄调减"].apply(to_int)
|
||||
BaseData["学龄"] = BaseData["学龄"].apply(to_int)
|
||||
|
||||
def fallback(x):
|
||||
for i in x:
|
||||
if pd.notna(i) and i != '':
|
||||
return i
|
||||
return ''
|
||||
|
||||
max_promote = 0
|
||||
max_history = 0
|
||||
# 填充类辅助函数
|
||||
|
||||
def fill_basic_info(ws, row):# 填充基本信息
|
||||
ws.cell(row=2, column=1, value=f"部门:{row['部门']} 职务:{row['职务']}")
|
||||
@ -206,11 +248,13 @@ def fill_history_info(ws, History_pd):# 填充历史记录
|
||||
ws.cell(row=H_START+index, column=7, value=hrow["变动原因"])
|
||||
# ws.cell(row=H_START+index, column=8, value=index) # Debug
|
||||
|
||||
BaseData["Latest_Role"] = None
|
||||
BaseData["Latest_Prom"] = None
|
||||
def main():
|
||||
|
||||
load_people()
|
||||
load_rule()
|
||||
|
||||
# 创建一个空的DataFrame来存储所有历史记录
|
||||
all_history = pd.DataFrame(columns=["身份证号码", "姓名", "时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因"])
|
||||
all_history = pd.DataFrame(columns=["身份证号码", "姓名", "时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
|
||||
|
||||
for index, row in BaseData.iterrows():
|
||||
try:
|
||||
@ -224,7 +268,7 @@ for index, row in BaseData.iterrows():
|
||||
# 查找晋升信息
|
||||
promote = Promote[Promote["身份证号"] == row["身份证号码"]]
|
||||
if not promote.empty:
|
||||
promote = promote.sort_values(by="任职时间", ascending=False).reset_index(drop=True)
|
||||
promote = promote.sort_values(by=["工资执行时间", "任职时间"], ascending=[False, False]).reset_index(drop=True)
|
||||
BaseData.at[index, "Latest_Role"] = promote.iloc[0]["新职务"]
|
||||
BaseData.at[index, "Latest_Prom"] = promote.iloc[0]["任职时间"]
|
||||
# 把原职务取出来
|
||||
@ -235,33 +279,33 @@ for index, row in BaseData.iterrows():
|
||||
BaseData.at[index, "职务2"] = row["初始职务"]
|
||||
BaseData.at[index, "日期2"] = row["入职时间"]
|
||||
|
||||
promote = promote.sort_values(by="任职时间").reset_index(drop=True)
|
||||
promote = promote.sort_values(by=["工资执行时间", "任职时间"]).reset_index(drop=True)
|
||||
fill_prompt_info(ws, promote)# 填充晋升信息
|
||||
|
||||
# 根据规则匹配职级薪资
|
||||
History_pd = pd.DataFrame(columns=["时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因"])
|
||||
History_pd = pd.DataFrame(columns=["时间", "职务", "职务工资", "级别档次", "级别工资", "工资合计", "变动原因", "晋升备注"])
|
||||
# 添加入职记录
|
||||
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级"]
|
||||
History_pd.loc[len(History_pd)] = [row["入职时间"], row["初始职务"], "", row["入职时的初始级别"], "", "", "套改/定级", ""]
|
||||
for index, prow in promote.iterrows(): # 添加晋升记录
|
||||
History_pd.loc[len(History_pd)] = [prow["工资执行时间"], prow["新职务"], "", "", "", "", "晋升"]
|
||||
History_pd.loc[len(History_pd)] = [prow["工资执行时间"]+relativedelta(hours=prow["任职时间"].month,minutes=prow["任职时间"].day), prow["新职务"], "", "", "", "", "晋升", f"任{prow['新职务']} {prow['变动批注'] if pd.notna(prow['变动批注']) else ''}"]
|
||||
try:
|
||||
calctime=row["晋档起始"] + relativedelta(minute=1)
|
||||
while True: # 添加晋档记录
|
||||
calctime += relativedelta(years=row["晋档间隔"])
|
||||
if calctime > nowtime:
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档"]
|
||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "两年晋档", ""]
|
||||
calctime=row["晋级起始"]
|
||||
while True: # 添加晋级记录
|
||||
calctime += relativedelta(years=row["晋级间隔"])
|
||||
if calctime > nowtime:
|
||||
if calctime > NOWTIME:
|
||||
break
|
||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级"]
|
||||
History_pd.loc[len(History_pd)] = [calctime, "", "", "", "", "", "五年晋级", ""]
|
||||
except:
|
||||
raise Exception(f"晋级、档起始时间格式错误:{row['晋档起始']}或{row['晋级起始']}")
|
||||
raise Exception(f"晋级、档起始或间隔时间格式错误:{row['晋级起始']}-{row['晋档起始']}-{row['晋级间隔']}-{row['晋档间隔']}")
|
||||
for rule in Rule_Level: # 工资调标
|
||||
if row["入职时间"] < rule["start"]:
|
||||
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标"]
|
||||
History_pd.loc[len(History_pd)] = [rule["start"], "", "", "", "", "", "工资调标", ""]
|
||||
History_pd = History_pd.sort_values(by="时间").reset_index(drop=True)
|
||||
|
||||
if History_pd.at[0,"时间"] != row["入职时间"]:
|
||||
@ -279,6 +323,9 @@ for index, row in BaseData.iterrows():
|
||||
if hrow["变动原因"] == "两年晋档":
|
||||
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
|
||||
elif hrow["变动原因"] == "五年晋级":
|
||||
if jb-1 < 1 or jb-1 < role_limit(History_pd.iloc[index]["职务"]):
|
||||
History_pd.at[index, "级别档次"] = f"{jb}-{dc+1}"
|
||||
else:
|
||||
History_pd.at[index, "级别档次"] = f"{jb-1}-{dc-1}"
|
||||
elif hrow["变动原因"] == "工资调标":
|
||||
History_pd.at[index, "级别档次"] = f"{jb}-{dc}"
|
||||
@ -339,8 +386,8 @@ for index, row in BaseData.iterrows(): # 汇总
|
||||
ws.cell(row=6+index, column=4, value=format_time(row["出生年月"], "出生年月"))
|
||||
ws.cell(row=6+index, column=5, value=format_time(row["参加工作时间"], "参加工作时间"))
|
||||
ws.cell(row=6+index, column=6, value=fallback([row["现学历"],row["学历"]]))
|
||||
ws.cell(row=6+index, column=7, value=nowtime.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1+row["学龄"])
|
||||
ws.cell(row=6+index, column=8, value=nowtime.year-row["参加工作时间"].year+row["工龄调增"]-row["工龄调减"]+1)
|
||||
ws.cell(row=6+index, column=7, value=row['工龄']+row["学龄"])
|
||||
ws.cell(row=6+index, column=8, value=row['工龄'])
|
||||
ws.cell(row=6+index, column=9, value=row["学龄"])
|
||||
ws.cell(row=6+index, column=10, value=row["工龄调减"])
|
||||
ws.cell(row=6+index, column=11, value=row["Latest_Role"])
|
||||
@ -355,3 +402,6 @@ if max_promote > 0:
|
||||
logging.warning(f"最多有[{max_promote}]条晋升信息,需要调整模板。记得同时调整薪资历史的起始行和个人评价结果。")
|
||||
if max_history > 0:
|
||||
logging.warning(f"最多有[{max_history}]条薪资历史,需要调整模板。")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user