openstack nova 源码分析3
nova下的service.py的源码,今天阅读之后 直接就把我理解的以注释的形式添加到了源码中,有些地方不好或者是错了,希望大家帮我指出![*]
[*]import inspect
[*]import os
[*]
[*]import eventlet
[*]import greenlet
[*]
[*]from nova import context
[*]from nova import db
[*]from nova import exception
[*]from nova import flags
[*]from nova import log as logging
[*]from nova import rpc
[*]from nova import utils
[*]from nova import version
[*]from nova import wsgi
[*]
[*]
[*]LOG = logging.getLogger('nova.service')
[*]
[*]FLAGS = flags.FLAGS#flags.py中有一句 FLAGS = FlagValues(),那么直接查看FlagValues()这个类,会发现这个类是继承于gflags.FlagValues.
[*]
[*]flags.DEFINE_integer('report_interval', 10,
[*] 'seconds between nodes reporting state to datastore',
[*] lower_bound=1)#参数名称,默认值和简短说明
[*]flags.DEFINE_integer('periodic_interval', 60,
[*] 'seconds between running periodic tasks',
[*] lower_bound=1)
[*]flags.DEFINE_string('ec2_listen', "0.0.0.0",
[*] 'IP address for EC2 API to listen')
[*]flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
[*]flags.DEFINE_string('osapi_listen', "0.0.0.0",
[*] 'IP address for OpenStack API to listen')
[*]flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
[*]flags.DEFINE_string('api_paste_config', "api-paste.ini",
[*] 'File name for the paste.deploy config for nova-api')
[*]
[*]#Launcher 类 包含run_server ,launch_server,stop,wait 4个函数,其实功能非常简单,首先初始化Launcher将self_services弄成一个空列表
[*]#run_server就是开始和等待一个server的完成,launcher_server就是将该server加入到该类初始化的列表中,
[*]#stop 就是for循环遍历self_services列表中的服务然后一个一个的kill掉
[*]#wait 函数就是 Waits until all services have been stopped
[*]class Launcher(object):
[*] """Launch one or more services and wait for them to complete."""
[*]
[*] def __init__(self):
[*] """Initialize the service launcher.
[*]
[*] :returns: None
[*]
[*] """
[*] self._services = []
[*]
[*] @staticmethod
[*] def run_server(server):
[*] """Start and wait for a server to finish.
[*]
[*] :param service: Server to run and wait for.
[*] :returns: None
[*]
[*] """
[*] server.start()
[*] server.wait()
[*]
[*] def launch_server(self, server):#在360行被调用
[*] """Load and start the given server.
[*]
[*] :param server: The server you would like to start.
[*] :returns: None
[*]
[*] """
[*] gt = eventlet.spawn(self.run_server, server)
[*] self._services.append(gt)
[*]
[*] def stop(self):
[*] """Stop all services which are currently running.
[*]
[*] :returns: None
[*]
[*] """
[*] for service in self._services:
[*] service.kill()
[*]
[*] def wait(self):
[*] """Waits until all services have been stopped, and then returns.
[*]
[*] :returns: None
[*]
[*] """
[*] for service in self._services:
[*] try:
[*] service.wait()
[*] except greenlet.GreenletExit:
[*] pass
[*]
[*]
[*]class Service(object):
[*] """Service object for binaries running on hosts.
[*]
[*] A service takes a manager and enables rpc by listening to queues based
[*] on topic. It also periodically runs tasks on the manager and reports
[*] it state to the database services table."""
[*]
[*] def __init__(self, host, binary, topic, manager, report_interval=None,
[*] periodic_interval=None, *args, **kwargs):
[*] self.host = host
[*] self.binary = binary
[*] self.topic = topic
[*] #动态的生成manager类,并动态生成实例
[*] self.manager_class_name = manager#在create函数中指定
[*] manager_class = utils.import_class(self.manager_class_name)#动态的import该类
[*] self.manager = manager_class(host=self.host, *args, **kwargs)#动态的生成实例
[*] #设置参数:应该是服务间隔时间之类的。
[*] self.report_interval = report_interval
[*] self.periodic_interval = periodic_interval
[*] #设置多出来的一些参数。
[*] super(Service, self).__init__(*args, **kwargs)
[*] self.saved_args, self.saved_kwargs = args, kwargs
[*] #设置一个列表 不知道是不是后面有需要用的地方 果然在185行 发现了
[*] self.timers = []
[*]
[*] def start(self):
[*] #设置版本
[*] vcs_string = version.version_string_with_vcs()
[*] logging.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
[*] {'topic': self.topic, 'vcs_string': vcs_string})
[*] #初始化host
[*] self.manager.init_host()
[*]
[*] self.model_disconnected = False
[*] ctxt = context.get_admin_context()
[*] try:
[*] service_ref = db.service_get_by_args(ctxt,
[*] self.host,
[*] self.binary)
[*] self.service_id = service_ref['id']
[*] except exception.NotFound:
[*] self._create_service_ref(ctxt)#该函数位于187行
[*]
[*] if 'nova-compute' == self.binary:
[*] self.manager.update_available_resource(ctxt)
[*]
[*] self.conn = rpc.create_connection(new=True)
[*] logging.debug("Creating Consumer connection for Service %s" %
[*] self.topic)
[*]
[*] # Share this same connection for these Consumers
[*] self.conn.create_consumer(self.topic, self, fanout=False)
[*]
[*] node_topic = '%s.%s' % (self.topic, self.host)#节点的topic 包括了topic 和 host
[*] self.conn.create_consumer(node_topic, self, fanout=False)
[*]
[*] self.conn.create_consumer(self.topic, self, fanout=True)
[*]
[*] # Consume from all consumers in a thread
[*] self.conn.consume_in_thread()
[*]
[*] if self.report_interval:
[*] pulse = utils.LoopingCall(self.report_state)#在265中可以找到report_state 他的作用是在存储中更新服务的状态
[*] pulse.start(interval=self.report_interval, now=False)
[*] self.timers.append(pulse)
[*]
[*] if self.periodic_interval:
[*] periodic = utils.LoopingCall(self.periodic_tasks)#在260行发现 Periodic_tasks任务在一个周期性间隔跑
[*] periodic.start(interval=self.periodic_interval, now=False)
[*] self.timers.append(periodic)
[*]
[*] def _create_service_ref(self, context):
[*] zone = FLAGS.node_availability_zone
[*] service_ref = db.service_create(context,
[*] {'host': self.host,
[*] 'binary': self.binary,
[*] 'topic': self.topic,
[*] 'report_count': 0,
[*] 'availability_zone': zone})
[*] self.service_id = service_ref['id']#猜测应该是获取当前服务的id
[*]
[*] def __getattr__(self, key):
[*] manager = self.__dict__.get('manager', None)
[*] return getattr(manager, key)
[*]
[*] @classmethod
[*] def create(cls, host=None, binary=None, topic=None, manager=None,
[*] report_interval=None, periodic_interval=None):
[*] """Instantiates class and passes back application object.
[*]
[*] :param host: defaults to FLAGS.host
[*] :param binary: defaults to basename of executable
[*] :param topic: defaults to bin_name - 'nova-' part
[*] :param manager: defaults to FLAGS.<topic>_manager
[*] :param report_interval: defaults to FLAGS.report_interval
[*] :param periodic_interval: defaults to FLAGS.periodic_interval
[*]
[*] """
[*] if not host:
[*] host = FLAGS.host
[*] if not binary:
[*] binary = os.path.basename(inspect.stack()[-1])
[*] if not topic:
[*] topic = binary.rpartition('nova-')
[*] if not manager:
[*] manager = FLAGS.get('%s_manager' % topic, None)
[*] if not report_interval:
[*] report_interval = FLAGS.report_interval
[*] if not periodic_interval:
[*] periodic_interval = FLAGS.periodic_interval
[*] service_obj = cls(host, binary, topic, manager,
[*] report_interval, periodic_interval)#此处 调用的是该类的init的默认函数
[*]
[*] return service_obj
[*]
[*] def kill(self):
[*] """Destroy the service object in the datastore."""
[*] self.stop()
[*] try:
[*] db.service_destroy(context.get_admin_context(), self.service_id)
[*] except exception.NotFound:
[*] logging.warn(_('Service killed that has no database entry'))
[*]
[*] def stop(self):
[*] # Try to shut the connection down, but if we get any sort of
[*] # errors, go ahead and ignore them.. as we're shutting down anyway
[*] try:
[*] self.conn.close()
[*] except Exception:
[*] pass
[*] for x in self.timers: #遍历曾经添加到self.timers中的每一个“间隔”(不是很清楚) 然后将其stop
[*] try:
[*] x.stop()
[*] except Exception:
[*] pass
[*] self.timers = [] #重新将self.timers置空
[*]
[*] def wait(self):
[*] for x in self.timers:
[*] try:
[*] x.wait()
[*] except Exception:
[*] pass
[*]
[*] def periodic_tasks(self):
[*] """Tasks to be run at a periodic interval."""
[*] #任务在一个周期性间隔跑
[*] self.manager.periodic_tasks(context.get_admin_context())
[*]
[*] def report_state(self):
[*] """Update the state of this service in the datastore."""
[*] #在数据存储更新服务的状态。
[*] ctxt = context.get_admin_context()
[*] try:
[*] try:
[*] service_ref = db.service_get(ctxt, self.service_id)
[*] except exception.NotFound:
[*] logging.debug(_('The service database object disappeared, '
[*] 'Recreating it.'))
[*] self._create_service_ref(ctxt)
[*] service_ref = db.service_get(ctxt, self.service_id)
[*]
[*] db.service_update(ctxt,
[*] self.service_id,
[*] {'report_count': service_ref['report_count'] + 1})
[*]
[*] # TODO(termie): make this pattern be more elegant.
[*] if getattr(self, 'model_disconnected', False):
[*] self.model_disconnected = False
[*] logging.error(_('Recovered model server connection!'))
[*]
[*] # TODO(vish): this should probably only catch connection errors
[*] except Exception:# pylint: disable=W0702
[*] if not getattr(self, 'model_disconnected', False):
[*] self.model_disconnected = True
[*] logging.exception(_('model server went away'))
[*]
[*]
[*]class WSGIService(object):
[*] """Provides ability to launch API from a 'paste' configuration."""
[*] #提供能够从一个‘paste’配置启动api的服务
[*]
[*] def __init__(self, name, loader=None):
[*] """Initialize, but do not start the WSGI server.
[*]
[*] :param name: The name of the WSGI server given to the loader.
[*] :param loader: Loads the WSGI application using the given name.
[*] :returns: None
[*]
[*] """
[*] #初始化 但是并没有开是wsgi的服务。
[*] self.name = name
[*] self.loader = loader or wsgi.Loader()
[*] self.app = self.loader.load_app(name)#将wsgi服务的名字给到self.loader,然后用那名字将其 装载到wsgi application
[*] self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") #返回主机host getattr Found at: __builtin__
[*] #getattr(object, name[, default]) -> value得知返回的是value
[*]
[*]
[*] self.port = getattr(FLAGS, '%s_listen_port' % name, 0)#端口 port
[*] self.server = wsgi.Server(name,
[*] self.app,
[*] host=self.host,
[*] port=self.port)
[*]
[*] def start(self):
[*] """Start serving this service using loaded configuration.
[*]
[*] Also, retrieve updated port number in case '0' was passed in, which
[*] indicates a random port should be used.
[*]
[*] :returns: None
[*]
[*] """
[*] self.server.start()
[*] self.port = self.server.port
[*]
[*] def stop(self):
[*] """Stop serving this API.
[*]
[*] :returns: None
[*]
[*] """
[*] self.server.stop()
[*]
[*] def wait(self):
[*] """Wait for the service to stop serving this API.
[*]
[*] :returns: None
[*]
[*] """
[*] self.server.wait()
[*]
[*]
[*]# NOTE(vish): the global launcher is to maintain the existing
[*]# functionality of calling service.serve +
[*]# service.wait
[*]_launcher = None
[*]
[*]
[*]def serve(*servers):
[*] global _launcher
[*] if not _launcher:
[*] _launcher = Launcher() #s实例化Launcher
[*] for server in servers:
[*] _launcher.launch_server(server)
[*]
[*]
[*]def wait():
[*] # After we've loaded up all our dynamic bits, check
[*] # whether we should print help
[*]
[*]
[*] #flags.py中 有一句
[*] #FLAGS = FlagValues(),那么直接查看FlagValues()这个类,这个类是继承于gflags.FlagValues.
[*]
[*]
[*] flags.DEFINE_flag(flags.HelpFlag())
[*] flags.DEFINE_flag(flags.HelpshortFlag())
[*] flags.DEFINE_flag(flags.HelpXMLFlag())
[*] FLAGS.ParseNewFlags()
[*] logging.debug(_('Full set of FLAGS:'))
[*] for flag in FLAGS:
[*] flag_get = FLAGS.get(flag, None)
[*] logging.debug('%(flag)s : %(flag_get)s' % locals())
[*] try:
[*] _launcher.wait()
[*] except KeyboardInterrupt:
[*] _launcher.stop()
页:
[1]