思想
ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。
最终的实现效果:
程序实现的主要代码
def update_pg2es():
global last_modify_time
print(type(last_modify_time
), last_modify_time
)
print("检测PG数据库是否有数据更新...")
pg_conn
= psycopg2
.connect
(config
.CONNECT_PG_DB_URL
)
pg_cursor
= pg_conn
.cursor
()
select_sql
= "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time
)
pg_cursor
.execute
(select_sql
)
datas
= pg_cursor
.fetchall
()
pg_conn
.close
pg_cursor
.close
es_data_list
= []
for row
in datas
:
column_values
= get_row_colum_info
(pg_cursor
, row
)
es_data
= translate_entity_date_to_es
(column_values
)
es_data_list
.append
(es_data
)
last_modify_time
= column_values
['modify_time']
index_entity
= config
.entity_index
type_entity
= config
.entity_type
num
= 10000
while (len(es_data_list
) / num
>= 0):
if (math
.floor
(len(es_data_list
) / num
) == 0):
es_result
= es_service
.insert_data_list
(index_entity
, type_entity
, es_data_list
[:])
print("存入{}个数据到ES".format(len(es_data_list
)))
break
print("存入{}个数据到ES".format(num
))
print("前五个数据:")
print(index_entity
, type_entity
, es_data_list
[:5])
es_result
= es_service
.insert_data_list
(index_entity
, type_entity
, es_data_list
[:num
])
es_data_list
= es_data_list
[num
:]
if "ok" != es_result
:
print("存储实体数据到es中失败")
break
t
= threading
.Timer
(10, update_pg2es
)
t
.start
()
if __name__
== '__main__':
update_pg2es
()
注意:
select now()::timestamp(6)without time zone SQL语句可以查询当前时间(不带时区,精度为秒后面保存6位)select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)将sql语句赋值给select_sql,sql语句的意思为选择出修改时间大于记录时间的数据,并将数据按照修改时间进行排序。
此处注意记录时间两边需要加单引号才能保证sql语句的正常运行。