deles 发表于 2018-6-2 14:42:23

openstack nova 源码分析3

  nova下的service.py的源码,今天阅读之后 直接就把我理解的以注释的形式添加到了源码中,有些地方不好或者是错了,希望大家帮我指出!


[*]
[*]import inspect
[*]import os
[*]
[*]import eventlet
[*]import greenlet
[*]
[*]from nova import context
[*]from nova import db
[*]from nova import exception
[*]from nova import flags
[*]from nova import log as logging
[*]from nova import rpc
[*]from nova import utils
[*]from nova import version
[*]from nova import wsgi
[*]
[*]
[*]LOG = logging.getLogger('nova.service')
[*]
[*]FLAGS = flags.FLAGS#flags.py中有一句 FLAGS = FlagValues(),那么直接查看FlagValues()这个类,会发现这个类是继承于gflags.FlagValues.
[*]
[*]flags.DEFINE_integer('report_interval', 10,
[*]                     'seconds between nodes reporting state to datastore',
[*]                     lower_bound=1)#参数名称,默认值和简短说明
[*]flags.DEFINE_integer('periodic_interval', 60,
[*]                     'seconds between running periodic tasks',
[*]                     lower_bound=1)
[*]flags.DEFINE_string('ec2_listen', "0.0.0.0",
[*]                  'IP address for EC2 API to listen')
[*]flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
[*]flags.DEFINE_string('osapi_listen', "0.0.0.0",
[*]                  'IP address for OpenStack API to listen')
[*]flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
[*]flags.DEFINE_string('api_paste_config', "api-paste.ini",
[*]                  'File name for the paste.deploy config for nova-api')
[*]
[*]#Launcher 类 包含run_server ,launch_server,stop,wait 4个函数,其实功能非常简单,首先初始化Launcher将self_services弄成一个空列表
[*]#run_server就是开始和等待一个server的完成,launcher_server就是将该server加入到该类初始化的列表中,
[*]#stop 就是for循环遍历self_services列表中的服务然后一个一个的kill掉
[*]#wait 函数就是 Waits until all services have been stopped
[*]class Launcher(object):
[*]    """Launch one or more services and wait for them to complete."""
[*]
[*]    def __init__(self):
[*]      """Initialize the service launcher.
[*]
[*]      :returns: None
[*]
[*]      """
[*]      self._services = []
[*]
[*]    @staticmethod
[*]    def run_server(server):
[*]      """Start and wait for a server to finish.
[*]
[*]      :param service: Server to run and wait for.
[*]      :returns: None
[*]
[*]      """
[*]      server.start()
[*]      server.wait()
[*]
[*]    def launch_server(self, server):#在360行被调用
[*]      """Load and start the given server.
[*]
[*]      :param server: The server you would like to start.
[*]      :returns: None
[*]
[*]      """
[*]      gt = eventlet.spawn(self.run_server, server)
[*]      self._services.append(gt)
[*]
[*]    def stop(self):
[*]      """Stop all services which are currently running.
[*]
[*]      :returns: None
[*]
[*]      """
[*]      for service in self._services:
[*]            service.kill()
[*]
[*]    def wait(self):
[*]      """Waits until all services have been stopped, and then returns.
[*]
[*]      :returns: None
[*]
[*]      """
[*]      for service in self._services:
[*]            try:
[*]                service.wait()
[*]            except greenlet.GreenletExit:
[*]                pass
[*]
[*]
[*]class Service(object):
[*]    """Service object for binaries running on hosts.
[*]
[*]    A service takes a manager and enables rpc by listening to queues based
[*]    on topic. It also periodically runs tasks on the manager and reports
[*]    it state to the database services table."""
[*]
[*]    def __init__(self, host, binary, topic, manager, report_interval=None,
[*]               periodic_interval=None, *args, **kwargs):
[*]      self.host = host
[*]      self.binary = binary
[*]      self.topic = topic
[*]      #动态的生成manager类,并动态生成实例
[*]      self.manager_class_name = manager#在create函数中指定
[*]      manager_class = utils.import_class(self.manager_class_name)#动态的import该类
[*]      self.manager = manager_class(host=self.host, *args, **kwargs)#动态的生成实例
[*]      #设置参数:应该是服务间隔时间之类的。
[*]      self.report_interval = report_interval
[*]      self.periodic_interval = periodic_interval
[*]      #设置多出来的一些参数。
[*]      super(Service, self).__init__(*args, **kwargs)
[*]      self.saved_args, self.saved_kwargs = args, kwargs
[*]      #设置一个列表 不知道是不是后面有需要用的地方 果然在185行 发现了
[*]      self.timers = []
[*]
[*]    def start(self):
[*]      #设置版本
[*]      vcs_string = version.version_string_with_vcs()
[*]      logging.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
[*]                      {'topic': self.topic, 'vcs_string': vcs_string})
[*]      #初始化host
[*]      self.manager.init_host()
[*]         
[*]      self.model_disconnected = False
[*]      ctxt = context.get_admin_context()
[*]      try:
[*]            service_ref = db.service_get_by_args(ctxt,
[*]                                                 self.host,
[*]                                                 self.binary)
[*]            self.service_id = service_ref['id']
[*]      except exception.NotFound:
[*]            self._create_service_ref(ctxt)#该函数位于187行
[*]
[*]      if 'nova-compute' == self.binary:
[*]            self.manager.update_available_resource(ctxt)
[*]
[*]      self.conn = rpc.create_connection(new=True)
[*]      logging.debug("Creating Consumer connection for Service %s" %
[*]                      self.topic)
[*]
[*]      # Share this same connection for these Consumers
[*]      self.conn.create_consumer(self.topic, self, fanout=False)
[*]
[*]      node_topic = '%s.%s' % (self.topic, self.host)#节点的topic 包括了topic 和 host
[*]      self.conn.create_consumer(node_topic, self, fanout=False)
[*]
[*]      self.conn.create_consumer(self.topic, self, fanout=True)
[*]
[*]      # Consume from all consumers in a thread
[*]      self.conn.consume_in_thread()
[*]
[*]      if self.report_interval:
[*]            pulse = utils.LoopingCall(self.report_state)#在265中可以找到report_state 他的作用是在存储中更新服务的状态
[*]            pulse.start(interval=self.report_interval, now=False)
[*]            self.timers.append(pulse)
[*]
[*]      if self.periodic_interval:
[*]            periodic = utils.LoopingCall(self.periodic_tasks)#在260行发现 Periodic_tasks任务在一个周期性间隔跑
[*]            periodic.start(interval=self.periodic_interval, now=False)
[*]            self.timers.append(periodic)
[*]
[*]    def _create_service_ref(self, context):
[*]      zone = FLAGS.node_availability_zone
[*]      service_ref = db.service_create(context,
[*]                                        {'host': self.host,
[*]                                       'binary': self.binary,
[*]                                       'topic': self.topic,
[*]                                       'report_count': 0,
[*]                                       'availability_zone': zone})
[*]      self.service_id = service_ref['id']#猜测应该是获取当前服务的id
[*]
[*]    def __getattr__(self, key):
[*]      manager = self.__dict__.get('manager', None)
[*]      return getattr(manager, key)
[*]
[*]    @classmethod
[*]    def create(cls, host=None, binary=None, topic=None, manager=None,
[*]               report_interval=None, periodic_interval=None):
[*]      """Instantiates class and passes back application object.
[*]
[*]      :param host: defaults to FLAGS.host
[*]      :param binary: defaults to basename of executable
[*]      :param topic: defaults to bin_name - 'nova-' part
[*]      :param manager: defaults to FLAGS.<topic>_manager
[*]      :param report_interval: defaults to FLAGS.report_interval
[*]      :param periodic_interval: defaults to FLAGS.periodic_interval
[*]
[*]      &quot;&quot;&quot;
[*]      if not host:
[*]            host = FLAGS.host
[*]      if not binary:
[*]            binary = os.path.basename(inspect.stack()[-1])
[*]      if not topic:
[*]            topic = binary.rpartition('nova-')
[*]      if not manager:
[*]            manager = FLAGS.get('%s_manager' % topic, None)
[*]      if not report_interval:
[*]            report_interval = FLAGS.report_interval
[*]      if not periodic_interval:
[*]            periodic_interval = FLAGS.periodic_interval
[*]      service_obj = cls(host, binary, topic, manager,
[*]                        report_interval, periodic_interval)#此处 调用的是该类的init的默认函数      
[*]
[*]      return service_obj
[*]
[*]    def kill(self):
[*]      &quot;&quot;&quot;Destroy the service object in the datastore.&quot;&quot;&quot;
[*]      self.stop()
[*]      try:
[*]            db.service_destroy(context.get_admin_context(), self.service_id)
[*]      except exception.NotFound:
[*]            logging.warn(_('Service killed that has no database entry'))
[*]
[*]    def stop(self):
[*]      # Try to shut the connection down, but if we get any sort of
[*]      # errors, go ahead and ignore them.. as we're shutting down anyway
[*]      try:
[*]            self.conn.close()
[*]      except Exception:
[*]            pass
[*]      for x in self.timers:   #遍历曾经添加到self.timers中的每一个“间隔”(不是很清楚) 然后将其stop
[*]            try:
[*]                x.stop()
[*]            except Exception:
[*]                pass
[*]      self.timers = [] #重新将self.timers置空
[*]
[*]    def wait(self):
[*]      for x in self.timers:
[*]            try:
[*]                x.wait()
[*]            except Exception:
[*]                pass
[*]
[*]    def periodic_tasks(self):
[*]      &quot;&quot;&quot;Tasks to be run at a periodic interval.&quot;&quot;&quot;
[*]      #任务在一个周期性间隔跑
[*]      self.manager.periodic_tasks(context.get_admin_context())
[*]
[*]    def report_state(self):
[*]      &quot;&quot;&quot;Update the state of this service in the datastore.&quot;&quot;&quot;
[*]      #在数据存储更新服务的状态。
[*]      ctxt = context.get_admin_context()
[*]      try:
[*]            try:
[*]                service_ref = db.service_get(ctxt, self.service_id)
[*]            except exception.NotFound:
[*]                logging.debug(_('The service database object disappeared, '
[*]                              'Recreating it.'))
[*]                self._create_service_ref(ctxt)
[*]                service_ref = db.service_get(ctxt, self.service_id)
[*]
[*]            db.service_update(ctxt,
[*]                           self.service_id,
[*]                           {'report_count': service_ref['report_count'] + 1})
[*]
[*]            # TODO(termie): make this pattern be more elegant.
[*]            if getattr(self, 'model_disconnected', False):
[*]                self.model_disconnected = False
[*]                logging.error(_('Recovered model server connection!'))
[*]
[*]      # TODO(vish): this should probably only catch connection errors
[*]      except Exception:# pylint: disable=W0702
[*]            if not getattr(self, 'model_disconnected', False):
[*]                self.model_disconnected = True
[*]                logging.exception(_('model server went away'))
[*]
[*]
[*]class WSGIService(object):
[*]    &quot;&quot;&quot;Provides ability to launch API from a 'paste' configuration.&quot;&quot;&quot;
[*]    #提供能够从一个‘paste’配置启动api的服务
[*]
[*]    def __init__(self, name, loader=None):
[*]      &quot;&quot;&quot;Initialize, but do not start the WSGI server.
[*]
[*]      :param name: The name of the WSGI server given to the loader.
[*]      :param loader: Loads the WSGI application using the given name.
[*]      :returns: None
[*]
[*]      &quot;&quot;&quot;
[*]      #初始化 但是并没有开是wsgi的服务。
[*]      self.name = name
[*]      self.loader = loader or wsgi.Loader()
[*]      self.app = self.loader.load_app(name)#将wsgi服务的名字给到self.loader,然后用那名字将其 装载到wsgi application
[*]      self.host = getattr(FLAGS, '%s_listen' % name, &quot;0.0.0.0&quot;) #返回主机host getattr Found at: __builtin__
[*]      #getattr(object, name[, default]) -> value得知返回的是value
[*]
[*]
[*]      self.port = getattr(FLAGS, '%s_listen_port' % name, 0)#端口 port
[*]      self.server = wsgi.Server(name,
[*]                                  self.app,
[*]                                  host=self.host,
[*]                                  port=self.port)
[*]
[*]    def start(self):
[*]      &quot;&quot;&quot;Start serving this service using loaded configuration.
[*]
[*]      Also, retrieve updated port number in case '0' was passed in, which
[*]      indicates a random port should be used.
[*]
[*]      :returns: None
[*]
[*]      &quot;&quot;&quot;
[*]      self.server.start()
[*]      self.port = self.server.port
[*]
[*]    def stop(self):
[*]      &quot;&quot;&quot;Stop serving this API.
[*]
[*]      :returns: None
[*]
[*]      &quot;&quot;&quot;
[*]      self.server.stop()
[*]
[*]    def wait(self):
[*]      &quot;&quot;&quot;Wait for the service to stop serving this API.
[*]
[*]      :returns: None
[*]
[*]      &quot;&quot;&quot;
[*]      self.server.wait()
[*]
[*]
[*]# NOTE(vish): the global launcher is to maintain the existing
[*]#             functionality of calling service.serve +
[*]#             service.wait
[*]_launcher = None
[*]
[*]
[*]def serve(*servers):
[*]    global _launcher
[*]    if not _launcher:
[*]      _launcher = Launcher() #s实例化Launcher
[*]    for server in servers:
[*]      _launcher.launch_server(server)
[*]
[*]
[*]def wait():
[*]    # After we've loaded up all our dynamic bits, check
[*]    # whether we should print help
[*]      
[*]
[*]    #flags.py中 有一句
[*]    #FLAGS = FlagValues(),那么直接查看FlagValues()这个类,这个类是继承于gflags.FlagValues.
[*]
[*]
[*]    flags.DEFINE_flag(flags.HelpFlag())
[*]    flags.DEFINE_flag(flags.HelpshortFlag())
[*]    flags.DEFINE_flag(flags.HelpXMLFlag())
[*]    FLAGS.ParseNewFlags()
[*]    logging.debug(_('Full set of FLAGS:'))
[*]    for flag in FLAGS:
[*]      flag_get = FLAGS.get(flag, None)
[*]      logging.debug('%(flag)s : %(flag_get)s' % locals())
[*]    try:
[*]      _launcher.wait()
[*]    except KeyboardInterrupt:
[*]      _launcher.stop()
页: [1]
查看完整版本: openstack nova 源码分析3