docling_extract.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #!/usr/bin/env python3
  2. import argparse
  3. import csv
  4. import sys
  5. import tempfile
  6. from pathlib import Path
  7. from io import StringIO
  8. from docling.document_converter import DocumentConverter
  9. def normalize_tables_to_html(markdown_text: str) -> str:
  10. # Docling returns markdown; downstream Java extractor expects HTML tables
  11. # marked with data-extraction attribute.
  12. lines = markdown_text.splitlines()
  13. out_lines = []
  14. in_table = False
  15. for line in lines:
  16. if "|" in line and line.strip().startswith("|") and line.strip().endswith("|"):
  17. if not in_table:
  18. out_lines.append('<table data-extraction="docling">')
  19. in_table = True
  20. cells = [cell.strip() for cell in line.strip().strip("|").split("|")]
  21. if all(cell.startswith("-") for cell in cells):
  22. continue
  23. row = "".join(f"<td>{cell}</td>" for cell in cells)
  24. out_lines.append(f"<tr>{row}</tr>")
  25. else:
  26. if in_table:
  27. out_lines.append("</table>")
  28. in_table = False
  29. if in_table:
  30. out_lines.append("</table>")
  31. return "\n".join(out_lines).strip()
  32. def parse_markdown_tables(markdown_text: str) -> list[list[list[str]]]:
  33. tables: list[list[list[str]]] = []
  34. current_table: list[list[str]] = []
  35. for line in markdown_text.splitlines():
  36. stripped = line.strip()
  37. if not stripped:
  38. # Docling markdown can include blank lines inside table regions.
  39. # Do not break the current table because of empty lines.
  40. continue
  41. is_table_line = "|" in stripped and stripped.startswith("|") and stripped.endswith("|")
  42. if not is_table_line:
  43. if current_table:
  44. tables.append(current_table)
  45. current_table = []
  46. continue
  47. cells = [cell.strip() for cell in stripped.strip("|").split("|")]
  48. if all(cell.startswith("-") for cell in cells):
  49. continue
  50. current_table.append(cells)
  51. if current_table:
  52. tables.append(current_table)
  53. return tables
  54. def normalize_tables_to_csv(markdown_text: str) -> str:
  55. tables = parse_markdown_tables(markdown_text)
  56. if not tables:
  57. return ""
  58. buffer = StringIO()
  59. # Force LF line endings to avoid double-spacing on Windows consumers.
  60. writer = csv.writer(buffer, lineterminator="\n")
  61. for table in tables:
  62. for row in table:
  63. writer.writerow(row)
  64. csv_text = buffer.getvalue().strip()
  65. return csv_text.replace("\r\n", "\n").replace("\r", "\n")
  66. def read_input(args: argparse.Namespace) -> str:
  67. if args.input_file:
  68. return str(Path(args.input_file).resolve())
  69. if args.stdin:
  70. html_text = sys.stdin.read()
  71. with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", suffix=".html", delete=False) as tmp_file:
  72. tmp_file.write(html_text)
  73. return tmp_file.name
  74. raise ValueError("Either --input-file or --stdin must be provided")
  75. def main() -> int:
  76. parser = argparse.ArgumentParser(description="Extract HTML tables using Docling")
  77. parser.add_argument("--input-file", help="Path to input file")
  78. parser.add_argument("--stdin", action="store_true", help="Read UTF-8 text from stdin")
  79. parser.add_argument("--output", choices=["markdown", "html", "csv"], default="markdown", help="Output format")
  80. args = parser.parse_args()
  81. temp_path = None
  82. try:
  83. source = read_input(args)
  84. if args.stdin:
  85. temp_path = source
  86. converter = DocumentConverter()
  87. result = converter.convert(source)
  88. markdown_output = result.document.export_to_markdown()
  89. if args.output == "markdown":
  90. output_payload = markdown_output
  91. elif args.output == "csv":
  92. output_payload = normalize_tables_to_csv(markdown_output)
  93. else:
  94. output_payload = normalize_tables_to_html(markdown_output)
  95. sys.stdout.write(output_payload)
  96. return 0
  97. except Exception as exc: # pylint: disable=broad-except
  98. sys.stderr.write(f"DOCLING_ERROR: {exc}\n")
  99. return 1
  100. finally:
  101. if temp_path:
  102. Path(temp_path).unlink(missing_ok=True)
  103. if __name__ == "__main__":
  104. raise SystemExit(main())