如果使用nacos-sdk-python(注意适用nacos版本),需要按照下面的链接修改源码的bug
https://github.com/nacos-group/nacos-sdk-python/issues/135
代码如下:
import nacos import threading import socket import requests import json from flask import Flask app = Flask(__name__) class NacosReg: def __init__(self): self.SERVER_HOST = "127.0.0.1" self.SERVER_PORT = 8848 self.SERVER_ADDRESSES = f"http://{self.SERVER_HOST}:{self.SERVER_PORT}" self.SERVICE_NAME = "test_server" self.GROUP_NAME = "DEFAULT_GROUP" self.username = "nacos" self.password = "nacos" self.client = nacos.NacosClient(self.SERVER_ADDRESSES, username=self.username, password=self.password) self.model_port = 8000 def get_host_ip(self): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((self.SERVER_HOST, self.SERVER_PORT)) ip = s.getsockname()[0] finally: s.close() return ip def service_register_and_check(self): self.service_register_() self.set_http_check() def service_register(self): self.client.add_naming_instance(self.SERVICE_NAME, self.get_host_ip(), self.model_port, ephemeral=False, group_name=self.GROUP_NAME) def service_register_(self): params = { "ip": self.get_host_ip(), "port": self.model_port, "serviceName": self.SERVICE_NAME, "weight": 1.0, "enable": True, "healthy": True, "clusterName": "None", "ephemeral": False, "groupName": self.GROUP_NAME, 'username': self.username, 'password': self.password } res = requests.post(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/instance", params=params) print(res.text) def service_unregister(self): self.client.remove_naming_instance(self.SERVICE_NAME, self.get_host_ip(), self.model_port, ephemeral=False, group_name=self.GROUP_NAME) def set_http_check(self): data = { "serviceName": f"{self.GROUP_NAME}@@{self.SERVICE_NAME}", "clusterName": "None", "checkPort": 80, "useInstancePort4Check": True, "healthChecker": json.dumps({"type": "HTTP", "path": "/test"}), "namespaceId": "" } res = requests.put(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/cluster", params={'username': self.username, 'password': self.password}, data=data) print(res.text) def service_list(self): return self.client.list_naming_instance(self.SERVICE_NAME, healthy_only=True) @app.route("/test", methods=['GET']) def test(): return "ok" if __name__ == '__main__': threading.Timer(1, NacosReg().service_register_and_check).start() app.run(port=8000)
服务器运行情况:
nacos服务器情况:
HTTP探活配置-- 单击上图集群配置:
参考资料:
GitHub - nacos-group/nacos-sdk-python: nacos python sdk
Open API 指南 | Nacos
还没有评论,来说两句吧...