sqlite相关操作

| 分类 插件  | 标签 sqlite  浏览次数: -

常用操作

  • 设置查询结果分行打印

.mode line

  • 查看所有的表

.table

  • 退出

.exit

  • 打印

.print xxx

  • 查看建表语句

.schema

  • 查询表字段

select * from sqlite_master where type=”table” and name=”{table_name}”;

  • 插入数据

insert info {table_name} values (“1”);

  • 避免交换式操作

sqlite3 fluentd.db “.table”

python脚本建表

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
#=============================================================================
#  ProjectName: fluentd
#     FileName: create
#         Desc: 通过sqlalchemy创建表
#       Author: seekplum
#        Email: 1131909224m@sina.cn
#     HomePage: seekplum.github.io
#       Create: 2018-12-12 19:23
#=============================================================================
"""
import json
import subprocess

from datetime import datetime

from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import PickleType
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import VARCHAR
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import MetaData
from sqlalchemy.orm import sessionmaker

db_file = "fluentd.db"

engine = create_engine("sqlite:///{}".format(db_file))
Session = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
metadata = MetaData(bind=engine)

session = Session()


class ModelMixin(object):
    def __getitem__(self, key):
        value = getattr(self, key)
        if isinstance(value, unicode):
            return str(value)
        return value

    def __getattribute__(self, key):
        value = super(ModelMixin, self).__getattribute__(key)
        if isinstance(value, unicode):
            return str(value)
        return value

    @classmethod
    def initialize(cls):
        table_name = cls.__table__.name
        try:
            table = metadata.tables[table_name]
        except KeyError:
            return False
        try:
            metadata.drop_all(tables=[table])
            metadata.create_all(tables=[table])
        except Exception:
            return False
        return True

    @classmethod
    def drop_table(cls):
        if cls.__table__ in metadata.sorted_tables:
            cls.__table__.drop(engine)

    @classmethod
    def create_table(cls):
        table_name = cls.__table__.name
        try:
            table = metadata.tables[table_name]
        except KeyError:
            return False
        metadata.create_all(tables=[table])


class MutableDict(Mutable, dict):
    @classmethod
    def coerce(cls, key, value):
        if not isinstance(value, MutableDict):
            if isinstance(value, dict):
                return MutableDict(value)
            return Mutable.coerce(key, value)
        else:
            return value

    def __delitem(self, key):
        dict.__delitem__(self, key)
        self.changed()

    def __setitem__(self, key, value):
        dict.__setitem__(self, key, value)
        self.changed()

    def __getstate__(self):
        return dict(self)

    def __setstate__(self, state):
        self.update(self)


Entity = declarative_base(name="Entity", metadata=metadata, cls=ModelMixin)


class Target(Entity):
    """target 配置表
    """

    __tablename__ = "target"

    id = Column(Integer, nullable=False, autoincrement=True, index=True, primary_key=True, doc="target 表id")
    tag = Column(VARCHAR(length=128), nullable=False, unique=True, index=True, doc="日志文件标识,默认为用于替换日志路径进行展示")
    log_path = Column(VARCHAR(length=128), nullable=False, unique=True, index=True, doc="被采集的日志路径")
    conf_path = Column(VARCHAR(length=128), nullable=False, unique=True, doc="td-agent conf配置文件路径")

    # context = Column(MutableDict.as_mutable(PickleType(pickler=json)), doc="配置文件内容")
    context = Column(Text, nullable=False, doc="配置文件内容")

    create_time = Column(DateTime, default=datetime.now, doc="首次创建时的本地时间")


class Elastic(Entity):
    """elastic 服务器配置表
    """

    __tablename__ = "elastic"

    id = Column(Integer, nullable=False, autoincrement=True, index=True, primary_key=True, doc="elastic 表id")

    ip = Column(String(length=39), nullable=False, index=True, doc="elastic服务器的ip地址,长度兼容ipv6")
    port = Column(Integer, nullable=False, index=True, doc="elastic服务器的端口号")
    context = Column(Text, nullable=False, doc="配置文件内容")
    create_time = Column(DateTime, default=datetime.now, doc="首次创建时的本地时间")

    target_id = Column(Integer, ForeignKey("target.id", ondelete="CASCADE"), index=True, doc="外键,关联 target 表的主键 id")


class ElasticContext(Entity):
    """elastic 配置文件内容 同一个 elastic 配置文件内容,会写在多个路径中
    """

    __tablename__ = "elastic_path"
    conf_path = Column(VARCHAR(length=128), nullable=False, index=True, primary_key=True, doc="elastic conf配置文件路径")
    create_time = Column(DateTime, default=datetime.now, doc="首次创建时的本地时间")

    elastic_id = Column(Integer, ForeignKey("elastic.id", ondelete="CASCADE"), index=True,
                        doc="外键,关联 elastic 表的主键 id")


class RunCmdError(Exception):
    """执行命令异常
    """

    def __init__(self, message, out_msg, err_msg):
        """初始化参数

        :param message: 错误提示信息
        :param out_msg: 执行命令输出结果
        :param err_msg: 执行命令错误信息
        """
        super(RunCmdError, self).__init__(message)
        self.out_msg = out_msg
        self.err_msg = err_msg


def run_cmd(cmd):
    """执行系统命令

    :param cmd 系统命令
    :type cmd str
    :example cmd hostname

    :rtype str
    :return 命令执行结果
    :example hostname

    :raise RunCmdError 命令执行失败
    """

    p = subprocess.Popen(cmd, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    out_msg = p.stdout.read()
    err_msg = p.stderr.read()
    exit_code = p.wait()
    if exit_code != 0:
        message = "run {} failed".format(cmd)
        raise RunCmdError(message=message, out_msg=out_msg, err_msg=err_msg)
    return out_msg


def get_column_doc(table):
    data = {
        column: getattr(table, column).comparator.doc for column in table.__table__.columns.keys()
    }
    data[table.__table__.name] = table.__doc__.strip()
    return data


def get_table_schema(table_name):
    """查询建表语句"""
    cmd = """sqlite3 {db_file} <<EOF
.schema {table_name}
.exit
EOF""".format(db_file=db_file, table_name=table_name)
    return run_cmd(cmd)


def append_column_doc(docs, output):
    """为表字段加注释
    """
    lines = output.splitlines()
    result = []
    for line in lines:
        data = line.split()
        # 表注释
        if len(data) == 4 and data[0].lower() == "create" and data[1].lower() == "table" and data[2] in docs:
            result.append("{line} -- {doc}".format(line=line, doc=docs[data[2]]))
        # 表字段注释
        elif data[0] in docs:
            result.append("{line} -- {doc}".format(line=line, doc=docs[data[0]]))
        else:
            result.append(line)

    return "\n".join(result)


def get_schema():
    """获取建表的schema
    """
    tables = [
        Target,
        Elastic,
        ElasticContext
    ]
    for table in tables:
        schema = get_table_schema(table.__table__.name)
        docs = get_column_doc(table)
        print append_column_doc(docs, schema)


def create_table():
    """创建表
    """
    Target.create_table()
    Elastic.create_table()
    ElasticContext.create_table()


def main():
    create_table()
    get_schema()


if __name__ == '__main__':
    main()

sql建表

CREATE TABLE target ( -- target 配置表
    id INTEGER NOT NULL,  -- target 表id
    tag VARCHAR(128) NOT NULL,  -- 日志文件标识,默认为用于替换日志路径进行展示
    log_path VARCHAR(128) NOT NULL,  -- 被采集的日志路径
    conf_path VARCHAR(128) NOT NULL,  -- td-agent conf配置文件路径
    context TEXT NOT NULL,  -- 配置文件内容
    create_time DATETIME,  -- 首次创建时的本地时间
    PRIMARY KEY (id), 
    UNIQUE (conf_path)
);
CREATE UNIQUE INDEX ix_target_tag ON target (tag);
CREATE INDEX ix_target_id ON target (id);
CREATE UNIQUE INDEX ix_target_log_path ON target (log_path);
CREATE TABLE elastic ( -- elastic 服务器配置表
    id INTEGER NOT NULL,  -- elastic 表id
    ip VARCHAR(39) NOT NULL,  -- elastic服务器的ip地址,长度兼容ipv6
    port INTEGER NOT NULL,  -- elastic服务器的端口号
    context TEXT NOT NULL,  -- 配置文件内容
    create_time DATETIME,  -- 首次创建时的本地时间
    target_id INTEGER,  -- 外键,关联 target 表的主键 id
    PRIMARY KEY (id), 
    FOREIGN KEY(target_id) REFERENCES target (id) ON DELETE CASCADE
);
CREATE INDEX ix_elastic_target_id ON elastic (target_id);
CREATE INDEX ix_elastic_ip ON elastic (ip);
CREATE INDEX ix_elastic_id ON elastic (id);
CREATE INDEX ix_elastic_port ON elastic (port);
CREATE TABLE elastic_path ( -- elastic 配置文件内容 同一个 elastic 配置文件内容,会写在多个路径中
    conf_path VARCHAR(128) NOT NULL,  -- elastic conf配置文件路径
    create_time DATETIME,  -- 首次创建时的本地时间
    elastic_id INTEGER,  -- 外键,关联 elastic 表的主键 id
    PRIMARY KEY (conf_path), 
    FOREIGN KEY(elastic_id) REFERENCES elastic (id) ON DELETE CASCADE
);
CREATE INDEX ix_elastic_path_conf_path ON elastic_path (conf_path);
CREATE INDEX ix_elastic_path_elastic_id ON elastic_path (elastic_id);

上一篇 HTTP状态码     下一篇 Tornado源码分析<二>
目录导航