gmf_forge_ai_shared_core.tools

Tools - Built-in tools and tool management.

1"""Tools - Built-in tools and tool management."""
2
3from gmf_forge_ai_shared_core.tools.mcp_server_registry import MCPServerRegistry
4
5__all__ = [
6    "MCPServerRegistry",
7]
class MCPServerRegistry:
18class MCPServerRegistry:
19    """
20    Registry for Model Context Protocol (MCP) servers.
21    
22    MCP servers provide structured interfaces for tools and resources.
23    """
24    
25    def __init__(self):
26        """Initialize MCP server registry."""
27        self._servers: Dict[str, MCPServerInfo] = {}
28    
29    def register(
30        self,
31        name: str,
32        endpoint: str,
33        capabilities: list[str],
34        version: str = "1.0",
35        **metadata: Any
36    ) -> None:
37        """Register an MCP server."""
38        self._servers[name] = MCPServerInfo(
39            name=name,
40            endpoint=endpoint,
41            capabilities=capabilities,
42            version=version,
43            metadata=metadata
44        )
45    
46    def get(self, name: str) -> Optional[MCPServerInfo]:
47        """Get MCP server information."""
48        return self._servers.get(name)
49    
50    def list_servers(self) -> list[MCPServerInfo]:
51        """List all registered MCP servers."""
52        return list(self._servers.values())
53    
54    def find_by_capability(self, capability: str) -> list[MCPServerInfo]:
55        """Find servers that provide a specific capability."""
56        return [
57            server for server in self._servers.values()
58            if capability in server.capabilities
59        ]

Registry for Model Context Protocol (MCP) servers.

MCP servers provide structured interfaces for tools and resources.

MCPServerRegistry()
25    def __init__(self):
26        """Initialize MCP server registry."""
27        self._servers: Dict[str, MCPServerInfo] = {}

Initialize MCP server registry.

def register( self, name: str, endpoint: str, capabilities: list[str], version: str = '1.0', **metadata: Any) -> None:
29    def register(
30        self,
31        name: str,
32        endpoint: str,
33        capabilities: list[str],
34        version: str = "1.0",
35        **metadata: Any
36    ) -> None:
37        """Register an MCP server."""
38        self._servers[name] = MCPServerInfo(
39            name=name,
40            endpoint=endpoint,
41            capabilities=capabilities,
42            version=version,
43            metadata=metadata
44        )

Register an MCP server.

def get( self, name: str) -> Optional[gmf_forge_ai_shared_core.tools.mcp_server_registry.MCPServerInfo]:
46    def get(self, name: str) -> Optional[MCPServerInfo]:
47        """Get MCP server information."""
48        return self._servers.get(name)

Get MCP server information.

def list_servers( self) -> list[gmf_forge_ai_shared_core.tools.mcp_server_registry.MCPServerInfo]:
50    def list_servers(self) -> list[MCPServerInfo]:
51        """List all registered MCP servers."""
52        return list(self._servers.values())

List all registered MCP servers.

def find_by_capability( self, capability: str) -> list[gmf_forge_ai_shared_core.tools.mcp_server_registry.MCPServerInfo]:
54    def find_by_capability(self, capability: str) -> list[MCPServerInfo]:
55        """Find servers that provide a specific capability."""
56        return [
57            server for server in self._servers.values()
58            if capability in server.capabilities
59        ]

Find servers that provide a specific capability.