添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

有需求使用DataX将HBase表中数据迁移到MySQL中,有的表在Phoenix中定义,使用hbase20xsqlreader读取。有的没有在Phoenix中定义,数据是动态插入,列是不固定的。

  • hbase20xsqlreader,读取Phoenix
  • hbase11xreader,读取HBase
  • 不管读取HBase还是Phoenix都要求填写column,但是可以写sql占位,传进来。

    我这里由于业务原因就没有使用占位符的方式传递。

    现在的业务需求是我HBase横表迁移到MySQL纵表中。

    动态列中的列名有在MySQL表中记录,所以实现流程就是

  • shell 脚本调用mysql读取出要在HBase中读取的列
  • 循环生成hbase11xreader读取-写入txt的DataX json
  • 调用DataX执行上面的json文件导出txt
  • 循环txt为每行加入MySQL要插入的列key
  • 生成读取txt,写入MySQL
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    #!/bin/bash

    # MySQL连接信息
    DB_USER=""
    DB_PASS=""
    DB_HOST=""
    DB_NAME=""

    # 执行的SQL语句
    SQL_QUERY="SELECT columnKey,otherColumn FROM KV;"

    # 执行SQL并将结果存储到关联数组
    result=($(mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -D${DB_NAME} -N -B -e "${SQL_QUERY}"))

    # 遍历结果,生成文件
    for ((i=0; i<${#result[@]}; i+=2)); do
    columnKey=${result[i]}
    otherColumn=${result[i+1]}

    # 生成JSON内容
    json_content="{
    \"job\": {
    \"setting\": {
    \"speed\": {
    \"channel\": 1
    }
    },
    \"content\": [
    {
    \"reader\": {
    \"name\": \"hbase11xreader\",
    \"parameter\": {
    \"hbaseConfig\": {
    \"hbase.zookeeper.quorum\": \"localhost\"
    },
    \"table\": \"Table\",
    \"encoding\": \"utf-8\",
    \"mode\": \"normal\",
    \"column\": [
    {
    \"name\": \"cf: column1\",
    \"type\": \"string\"
    },
    {
    \"name\": \"cf: column2\",
    \"type\": \"date\",
    \"format\":\"yyyy-MM-dd HH:mm:ss\"
    },
    {
    \"name\": \"cf: ${columnKey}\",
    \"type\": \"string\"
    }
    ],
    \"range\": {
    \"startRowkey\": \"\",
    \"endRowkey\": \"\",
    \"isBinaryRowkey\": true
    }
    }
    },
    \"writer\": {
    \"name\": \"txtfilewriter\",
    \"parameter\": {
    \"path\": \"datax_trans_file/tmp\",
    \"fileName\": \"dataCustom_${columnKey}_${otherColumn}\",
    \"writeMode\": \"truncate\",
    \"fieldDelimiter\": \"|\"
    }
    }
    }
    ]
    }
    }"

    # 将JSON内容写入文件
    echo "$json_content" > "datax_trans_file/dataCustom_${columnKey}_${otherColumn}_2txt.json"

    # 执行datax 生成txt文件
    dataxtool/datax/bin/datax.py datax_trans_file/dataCustom_${columnKey}_${otherColumn}_2txt.json

    # 读出对应txt 插入统计编码和名称

    done

    # 列出txt文件列表 插入数据


    # 遍历指定目录下的匹配文件,并处理文件内容
    for file in datax_trans_file/tmp/dataCustom_*; do
    if [[ -f "$file" ]]; then
    # 提取otherColumn和columnKey
    filename=$(basename "$file")
    IFS="_" read -ra parts <<< "$filename"
    columnKey="${parts[1]}"
    otherColumn="${parts[2]}"

    # 为每一行添加信息
    while IFS= read -r line; do
    echo "$line|$columnKey|$otherColumn|1"
    done < "$file" > "$file.tmp"

    # 将临时文件替换原文件
    mv "$file.tmp" "$file"

    echo "Added info to: $file"
    fi
    done

    # 执行写入mysql
    dataxtool/datax/bin/datax.py datax_trans_file/dataCustom2mysql.json
    02:00:00:00:00:00 0kb 301 302 APKParser AXMLPrinter2 Activity Android TV AndroidStudio Aviator Bean C++ Compiler DataBinding DataX DateFormat DebugNativeLibs Debugger Deferred Deep Link DownloadManager Dynamic Links EPASV Excel FTP FeignClient Gradle Gson HBase Hibernate Http Status Code HttpClient Interview JNI JVM Jenkins JitPack Kids Linux MGR Makefile Mock MySQL NDK NODE_ENV Native Node.js版本 OSS Override Phoenix Query Server 6.0.0 Qualifier Redis RestTemplate RestoreInstance SDK SDK更新给开发者 SPI ShardingSphere SimpleDateFormat Singleton Spring SpringBatch StepScope ThreadPoolExecutor Time Zone WEBGL_debug_renderer_info WIFI WebGL WebView XML YYYY _In_ _Out_ antd brew chrome cross-env dom4j experimental-webgl fit flavor fullgc github api hexo iOS ip libjpeg-turbo manifest mockjs module nvm onActivityForResult onPageFinished python scope so spawn v36 versionCode vue 中位数 中程数 主动push 人脸支付 众数 兼容问题 动态参数 同步支付 命名空间 均值 多形参 头文件 如何主动PUSH 宽表 异步支付 引流 支付 数据决策 春运交通情况 智能硬件 有缺陷的OpenSSL版本 权限 极差 构造函数 模板 横表 水印 用户体验 窄表 竖表 纵表 蓝牙 资费套餐 香港卡