Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Im using the python SQL parser to get the table information. Im able to get the table name and schema name.
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
DISTSTYLE EVEN
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
DISTSTYLE EVEN
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])
Output:
public.actor
But if I want to return the column name and the data type which token I can use for printing both the two DDL.
The output is something like this,[not exactly the same :)]
table: public.actor
print column name and data type one by one(maybe in a for loop)
column: actor_id
date type: integer
column: first_name
data type: character varying
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
DISTSTYLE EVEN
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
DISTSTYLE EVEN
def get_table_name(tokens):
for token in reversed(tokens):
if token.ttype is None:
return token.value
return " "
parse = sqlparse.parse(line)
for stmt in parse:
# Get all the tokens except whitespaces
tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
is_create_stmt = False
for i, token in enumerate(tokens):
# Is it a create statements ?
if token.match(sqlparse.tokens.DDL, 'CREATE'):
is_create_stmt = True
continue
# If it was a create statement and the current token starts with "("
if is_create_stmt and token.value.startswith("("):
# Get the table name by looking at the tokens in reverse order till you find
# a token with None type
print (f"table: {get_table_name(tokens[:i])}")
# Now parse the columns
txt = token.value
columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
for column in columns:
c = ' '.join(column.split()).split()
c_name = c[0].replace('\"',"")
c_type = c[1] # For condensed type information
#c_type = " ".join(c[1:]) # For detailed type information
print (f"column: {c_name}")
print (f"date type: {c_type}")
print ("---"*20)
break
Output:
table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
–
–
–
–
You can use a function from a script available in python-sqlparse examples to extract the data:
def extract_definitions(token_list):
# assumes that token_list is a parenthesis
definitions = []
tmp = []
par_level = 0
for token in token_list.flatten():
if token.is_whitespace:
continue
elif token.match(sqlparse.tokens.Punctuation, '('):
par_level += 1
continue
if token.match(sqlparse.tokens.Punctuation, ')'):
if par_level == 0:
break
else:
par_level += 1
elif token.match(sqlparse.tokens.Punctuation, ','):
if tmp:
definitions.append(tmp)
tmp = []
else:
tmp.append(token)
if tmp:
definitions.append(tmp)
return definitions
and use for-loop to print information about columns names and their data types:
parsed = sqlparse.parse(line)[0]
# extract the parenthesis which holds column definitions
_, par = parsed.token_next_by(i=sqlparse.sql.Parenthesis)
columns = extract_definitions(par)
for column in columns:
print(f"column: {column[0]}")
print(f"data type: {' '.join(str(t) for t in column[1:])}")
This code produces the following output:
column: actor_id
data type: integer DEFAULT nextval 'public.actor_actor_id_seq' :: regclass NOT NULL
column: first_name
data type: character varying 45 NOT NULL
column: last_name
data type: character varying 45 NOT NULL
column: last_update
data type: timestamp without time zone DEFAULT now NOT NULL
It is actually a little bit more information than you wanted. However, those strings should be easily ''regexable'' to extract only basic data types.
As already mentioned in comments, using special information (e.g. DISTSTYLE
) causes parser not to recognize sqlparse.sql.Parenthesis
instance and was reported as a bug. Thus, such information needs to be removed from the SQL query before parsing.
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.